/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.sysds.test;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.StringTokenizer;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.compress.CompressedMatrixBlock;
import org.apache.sysds.runtime.data.DenseBlockFP64;
import org.apache.sysds.runtime.data.SparseBlock;
import org.apache.sysds.runtime.data.TensorBlock;
import org.apache.sysds.runtime.frame.data.FrameBlock;
import org.apache.sysds.runtime.frame.data.columns.Array;
import org.apache.sysds.runtime.frame.data.columns.ArrayFactory;
import org.apache.sysds.runtime.frame.data.columns.ColumnMetadata;
import org.apache.sysds.runtime.frame.data.columns.OptionalArray;
import org.apache.sysds.runtime.frame.data.columns.StringArray;
import org.apache.sysds.runtime.frame.data.lib.FrameLibApplySchema;
import org.apache.sysds.runtime.frame.data.lib.FrameUtil;
import org.apache.sysds.runtime.functionobjects.Builtin;
import org.apache.sysds.runtime.functionobjects.Builtin.BuiltinCode;
import org.apache.sysds.runtime.io.FrameWriter;
import org.apache.sysds.runtime.io.FrameWriterFactory;
import org.apache.sysds.runtime.io.IOUtilFunctions;
import org.apache.sysds.runtime.io.MatrixReaderFactory;
import org.apache.sysds.runtime.matrix.data.MatrixBlock;
import org.apache.sysds.runtime.matrix.data.MatrixCell;
import org.apache.sysds.runtime.matrix.data.MatrixIndexes;
import org.apache.sysds.runtime.matrix.data.MatrixValue.CellIndex;
import org.apache.sysds.runtime.matrix.operators.UnaryOperator;
import org.apache.sysds.runtime.meta.DataCharacteristics;
import org.apache.sysds.runtime.meta.MatrixCharacteristics;
import org.apache.sysds.runtime.meta.MetaDataAll;
import org.apache.sysds.runtime.util.DataConverter;
import org.apache.sysds.runtime.util.UtilFunctions;
import org.junit.Assert;

/**
 * <p>
 * Provides methods to easily create tests. Implemented methods can be used for
 * </p>
 * <ul>
 * <li>data comparison</li>
 * <li>test data generation</li>
 * <li>writing files</li>
 * <li>reading files</li>
 * <li>clean up</li>
 * </ul>
 */
public class TestUtils {

	private static final Log LOG = LogFactory.getLog(TestUtils.class.getName());

	/** job configuration used for file system access */
	public static Configuration conf = new Configuration();

	/** global random generator for default seed */
	public static Random random = new Random(System.currentTimeMillis());

	/** internal buffer to store assertion information */
	private static ArrayList<String> _AssertInfos = new ArrayList<>();
	private static boolean _AssertOccured = false;

	/* Compare expected scalar generated by Java with actual scalar generated by DML */
	public static void compareDMLScalarWithJavaScalar(String expectedFile, String actualFile, double epsilon) {
		try {
			String lineExpected = null;
			String lineActual = null;

			Path compareFile = new Path(expectedFile);
			FileSystem fs = IOUtilFunctions.getFileSystem(compareFile, conf);
			FSDataInputStream fsin = fs.open(compareFile);
			try( BufferedReader compareIn = new BufferedReader(new InputStreamReader(fsin)) ) {
				lineExpected = compareIn.readLine();
			}

			Path outFile = new Path(actualFile);
			FSDataInputStream fsout = fs.open(outFile);
			try( BufferedReader outIn = new BufferedReader(new InputStreamReader(fsout)) ) {
				lineActual = outIn.readLine();
			}

			assertEquals(expectedFile + ": " + lineExpected + " vs " + actualFile + ": " + lineActual,
					Double.parseDouble(lineExpected), Double.parseDouble(lineActual), epsilon);
		} catch (IOException e) {
			fail("unable to read file: " + e.getMessage());
		}
	}

	/**
	 * Compares contents of an expected file with the actual file, where rows may be permuted
	 * @param expectedFile
	 * @param actualDir
	 * @param epsilon
	 */
	public static void compareDMLMatrixWithJavaMatrixRowsOutOfOrder(String expectedFile, String actualDir, double epsilon)
	{
		try {
			HashMap<CellIndex, Double> expectedValues = new HashMap<>();

			Path outDirectory = new Path(actualDir);
			Path compareFile = new Path(expectedFile);
			FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf);
			FSDataInputStream fsin = fs.open(compareFile);
			readValuesFromFileStream(fsin, expectedValues);

			HashMap<CellIndex, Double> actualValues = new HashMap<>();

			FileStatus[] outFiles = fs.listStatus(outDirectory);

			for (FileStatus file : outFiles) {
				FSDataInputStream fsout = fs.open(file.getPath());
				readValuesFromFileStream(fsout, actualValues);
			}

			ArrayList<Double> e_list = new ArrayList<>();
			for (CellIndex index : expectedValues.keySet()) {
				Double expectedValue = expectedValues.get(index);
				if(expectedValue != 0.0)
					e_list.add(expectedValue);
			}

			ArrayList<Double> a_list = new ArrayList<>();
			for (CellIndex index : actualValues.keySet()) {
				Double actualValue = actualValues.get(index);
				if(actualValue != 0.0)
					a_list.add(actualValue);
			}

			Collections.sort(e_list);
			Collections.sort(a_list);

			assertTrue("Matrix nzs not equal", e_list.size() == a_list.size());
			for(int i=0; i < e_list.size(); i++)
			{
				assertTrue("Matrix values not equals", Math.abs(e_list.get(i) - a_list.get(i)) <= epsilon);
			}

		} catch (IOException e) {
			fail("unable to read file: " + e.getMessage());
		}
	}

	/**
	 * <p>
	 * Compares the expected values calculated in Java by testcase and which are
	 * in the normal filesystem, with those calculated by SystemDS located in
	 * HDFS with Matrix Market format
	 * </p>
	 *
	 * @param expectedFile
	 *            file with expected values, which is located in OS filesystem
	 * @param actualDir
	 *            file with actual values, which is located in HDFS
	 * @param epsilon
	 *            tolerance for value comparison
	 */
	public static void compareMMMatrixWithJavaMatrix(String expectedFile, String actualDir, double epsilon) {
		try {
			Path outDirectory = new Path(actualDir);
			Path compareFile = new Path(expectedFile);
			FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf);
			FSDataInputStream fsin = fs.open(compareFile);

			HashMap<CellIndex, Double> expectedValues = new HashMap<>();
			String[] expRcn = null;

			try(BufferedReader compareIn = new BufferedReader(new InputStreamReader(fsin)) ) {
				// skip the header of Matrix Market file
				String line = compareIn.readLine();

				// rows, cols and nnz
				line = compareIn.readLine();
				expRcn = line.split(" ");

				readValuesFromFileStreamAndPut(compareIn, expectedValues);
			}

			HashMap<CellIndex, Double> actualValues = new HashMap<>();

			FSDataInputStream fsout = fs.open(outDirectory);
			try( BufferedReader outIn = new BufferedReader(new InputStreamReader(fsout)) ) {

				//skip MM header
				String line = outIn.readLine();

				//rows, cols and nnz
				line = outIn.readLine();
				String[] rcn = line.split(" ");

				if (Integer.parseInt(expRcn[0]) != Integer.parseInt(rcn[0])) {
					LOG.warn(" Rows mismatch: expected " + Integer.parseInt(expRcn[0]) + ", actual " + Integer.parseInt(rcn[0]));
				}
				else if (Integer.parseInt(expRcn[1]) != Integer.parseInt(rcn[1])) {
					LOG.warn(" Cols mismatch: expected " + Integer.parseInt(expRcn[1]) + ", actual " + Integer.parseInt(rcn[1]));
				}
				else if (Integer.parseInt(expRcn[2]) != Integer.parseInt(rcn[2])) {
					LOG.warn(" Nnz mismatch: expected " + Integer.parseInt(expRcn[2]) + ", actual " + Integer.parseInt(rcn[2]));
				}

				readValuesFromFileStreamAndPut(outIn, actualValues);
			}

			Set<CellIndex> allKeys = new HashSet<>();
			allKeys.addAll(expectedValues.keySet());
			if(expectedValues.size() != actualValues.size())
				allKeys.addAll(actualValues.keySet());

			int countErrors = 0;
			for (CellIndex index : allKeys) {
				Double expectedValue = expectedValues.get(index);
				Double actualValue = actualValues.get(index);
				if (expectedValue == null)
					expectedValue = 0.0;
				if (actualValue == null)
					actualValue = 0.0;

				if (!compareCellValue(expectedValue, actualValue, epsilon, false)) {
					System.out.println(expectedFile+": "+index+" mismatch: expected " + expectedValue + ", actual " + actualValue);
					countErrors++;
				}
			}
			assertTrue("for file " + actualDir + " " + countErrors + " values are not equal", countErrors == 0);
		} catch (IOException e) {
			fail("unable to read file: " + e.getMessage());
		}
	}

	/**
	 * Read doubles from the input stream and put them into the given hashmap of values.
	 * @param inputStream input stream of doubles with related indices
	 * @param values hashmap of values (initially empty)
	 * @throws IOException
	 */
	public static void readValuesFromFileStream(FSDataInputStream inputStream, HashMap<CellIndex, Double> values)
		throws IOException
	{
		try( BufferedReader inReader = new BufferedReader(new InputStreamReader(inputStream)) ) {
			readValuesFromFileStreamAndPut(inReader, values);
		}
	}

	/**
	 * Read values from file stream and put into hashmap
	 * @param inReader BufferedReader to read values from
	 * @param values hashmap where values are put
	*/
	public static void readValuesFromFileStreamAndPut(BufferedReader inReader, HashMap<CellIndex, Double> values)
		throws IOException
	{
		String line = null;
		while ((line = inReader.readLine()) != null) {
			try{

				StringTokenizer st = new StringTokenizer(line, " ");
				int i = Integer.parseInt(st.nextToken());
				int j = Integer.parseInt(st.nextToken());
				double v = Double.parseDouble(st.nextToken());
				values.put(new CellIndex(i, j), v);
			}
			catch(Exception e){
				throw new IOException("failed parsing line:" + line,e);
			}
		}
	}

	/**
	 * <p>
	 * Read the cell values of the expected file and actual files. Schema is used for correct parsing if the file is a
	 * frame and if it is null FP64 will be used for all values (useful for Matrices).
	 * </p>
	 *
	 * @param schema         the schema of the frame, can be null (for FP64)
	 * @param expectedFile   the file with expected values
	 * @param actualDir      the directory where the actual values were written
	 * @param expectedValues the HashMap where the expected values will be written to
	 * @param actualValues   the HashMap where the actual values will be written to
	 */
	private static void readActualAndExpectedFile(ValueType[] schema, String expectedFile, String actualDir,
		HashMap<CellIndex, Object> expectedValues, HashMap<CellIndex, Object> actualValues) {
		try {
			Path outDirectory = new Path(actualDir);
			Path compareFile = new Path(expectedFile);
			FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf);
			FSDataInputStream fsin = fs.open(compareFile);

			try(BufferedReader compareIn = new BufferedReader(new InputStreamReader(fsin))) {
				String line;
				while((line = compareIn.readLine()) != null) {
					StringTokenizer st = new StringTokenizer(line, " ");
					int i = Integer.parseInt(st.nextToken());
					int j = Integer.parseInt(st.nextToken());
					ValueType vt = (schema != null) ? schema[j - 1] : ValueType.FP64;
					Object obj = UtilFunctions.stringToObject(vt, st.nextToken());
					expectedValues.put(new CellIndex(i, j), obj);
				}
			}

			FileStatus[] outFiles = fs.listStatus(outDirectory);

			for(FileStatus file : outFiles) {
				FSDataInputStream fsout = fs.open(file.getPath());
				try(BufferedReader outIn = new BufferedReader(new InputStreamReader(fsout))) {
					String line;
					while((line = outIn.readLine()) != null) {
						StringTokenizer st = new StringTokenizer(line, " ");
						int i = Integer.parseInt(st.nextToken());
						int j = Integer.parseInt(st.nextToken());
						ValueType vt = (schema != null) ? schema[j - 1] : ValueType.FP64;
						Object obj = UtilFunctions.stringToObject(vt, st.nextToken());
						actualValues.put(new CellIndex(i, j), obj);
					}
				}
			}
		}
		catch(IOException e) {
			fail("unable to read file: " + e.getMessage());
		}
	}

	/**
	 * <p>
	 * Compares the expected values calculated in Java by testcase and which are
	 * in the normal filesystem, with those calculated by SystemDS located in
	 * HDFS
	 * </p>
	 *
	 * @param expectedFile
	 *            file with expected values, which is located in OS filesystem
	 * @param actualDir
	 *            file with actual values, which is located in HDFS
	 * @param epsilon
	 *            tolerance for value comparison
	 */
	public static void compareDMLMatrixWithJavaMatrix(String expectedFile, String actualDir, double epsilon) {
		HashMap<CellIndex, Object> expectedValues = new HashMap<>();
		HashMap<CellIndex, Object> actualValues = new HashMap<>();

		readActualAndExpectedFile(null, expectedFile, actualDir, expectedValues, actualValues);

		Set<CellIndex> allKeys = new HashSet<>();
		allKeys.addAll(expectedValues.keySet());
		if(expectedValues.size() != actualValues.size())
			allKeys.addAll(actualValues.keySet());
		int countErrors = 0;
		for(CellIndex index : allKeys) {
			Double expectedValue = (Double) expectedValues.get(index);
			Double actualValue = (Double) actualValues.get(index);
			if(expectedValue == null)
				expectedValue = 0.0;
			if(actualValue == null)
				actualValue = 0.0;

			if(!compareCellValue(expectedValue, actualValue, epsilon, false)) {
				System.out.println(
					expectedFile + ": " + index + " mismatch: expected " + expectedValue + ", actual " + actualValue);
				countErrors++;
			}
		}
		assertEquals("for file " + actualDir + " " + countErrors + " values are not equal", 0, countErrors);
	}

	/**
	 * <p>
	 * Compares the expected values calculated in Java by testcase and which are
	 * in the normal filesystem, with those calculated by SystemDS located in
	 * HDFS
	 * </p>
	 *
	 * @param expectedFile
	 *            file with expected values, which is located in OS filesystem
	 * @param actualDir
	 *            file with actual values, which is located in HDFS
	 */
	public static void compareDMLFrameWithJavaFrame(ValueType[] schema, String expectedFile, String actualDir) {
		HashMap<CellIndex, Object> expectedValues = new HashMap<>();
		HashMap<CellIndex, Object> actualValues = new HashMap<>();

		readActualAndExpectedFile(schema, expectedFile, actualDir, expectedValues, actualValues);

		Set<CellIndex> allKeys = new HashSet<>();
		allKeys.addAll(expectedValues.keySet());
		if(expectedValues.size() != actualValues.size())
			allKeys.addAll(actualValues.keySet());
		int countErrors = 0;
		StringBuilder sb = new StringBuilder();
		for(CellIndex index : allKeys) {
			Object expectedValue = expectedValues.get(index);
			Object actualValue = actualValues.get(index);
			int j = index.column;
			expectedValue = expectedValue == null ? ArrayFactory.defaultNullValue(schema[j - 1]): expectedValue;
			actualValue = actualValue == null ? ArrayFactory.defaultNullValue(schema[j - 1]): actualValue;
			
			if(UtilFunctions.compareTo(schema[j - 1], expectedValue, actualValue) != 0) {
				sb.append(expectedFile);
				sb.append(": ");
				sb.append(index);
				sb.append(" mismatch: expected ");
				sb.append(expectedValue);
				sb.append(", actual ");
				sb.append(actualValue);
				sb.append("\n");
				countErrors++;
			}
			if(countErrors > 20)
				break;
		}
		sb.append("for file " + actualDir + " " + countErrors + " at least values are not equal");
		assertEquals(sb.toString(), 0, countErrors);
	}

	public static void compareTensorBlocks(TensorBlock tb1, TensorBlock tb2) {
		Assert.assertEquals(tb1.getValueType(), tb2.getValueType());
		Assert.assertArrayEquals(tb1.getSchema(), tb2.getSchema());
		Assert.assertEquals(tb1.getNumRows(), tb2.getNumRows());
		Assert.assertEquals(tb1.getNumColumns(), tb2.getNumColumns());
		for (int i = 0; i < tb1.getNumRows(); i++)
			for (int j = 0; j < tb1.getNumColumns(); j++)
				Assert.assertEquals(tb1.get(new int[]{i, j}), tb2.get(new int[]{i, j}));
	}

	public static TensorBlock createBasicTensor(ValueType vt, int rows, int cols, double sparsity) {
		return DataConverter.convertToTensorBlock(TestUtils.round(
			MatrixBlock.randOperations(rows, cols, sparsity, 0, 10, "uniform", 7)), vt, true);
	}

	public static TensorBlock createDataTensor(ValueType vt, int rows, int cols, double sparsity) {
		return DataConverter.convertToTensorBlock(TestUtils.round(
			MatrixBlock.randOperations(rows, cols, sparsity, 0, 10, "uniform", 7)), vt, false);
	}

	/**
	 * Reads values from a matrix file in HDFS in DML format
	 *
	 * NOTE: For reading the output of a matrix produced by a JUnit test, use the convenience
	 *       function {@link AutomatedTestBase#readDMLMatrixFromOutputDir(String)}
	 *
	 * @param filePath Path to the file to be read.
	 * @return Matrix values in a hashmap <index,value>
	 */
	public static HashMap<CellIndex, Double> readDMLMatrixFromHDFS(String filePath)
	{
		HashMap<CellIndex, Double> expectedValues = new HashMap<>();

		Path outDirectory = new Path(filePath);
		try
		{
			FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf);
			FileStatus[] outFiles = fs.listStatus(outDirectory);
			for (FileStatus file : outFiles) {
				FSDataInputStream outIn = fs.open(file.getPath());
				readValuesFromFileStream(outIn, expectedValues);
			}
		}
		catch (IOException e) {
			e.printStackTrace();
			try{
				FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf);
				FileStatus[] outFiles = fs.listStatus(outDirectory);
				String fileContent = "";
				for (FileStatus file : outFiles) {
					FSDataInputStream outIn = fs.open(file.getPath());
					fileContent += new String(outIn.readNBytes(100));
					fileContent += "...";
				}
				fail("could not read from file " + filePath+": "+e.getMessage() + "\ncontent:\n" + fileContent);
		
			}catch (IOException e2){
				fail("could not read from file " + filePath+": "+e.getMessage());
			}
		}

		return expectedValues;
	}

	public static MatrixBlock readBinary(String path){
		try{

			MetaDataAll mba = new MetaDataAll(path + ".mtd", false, true);
			if(!mba.mtdExists())
				throw new IOException("File does not exist");
			DataCharacteristics ds = mba.getDataCharacteristics();
			FileFormat f = FileFormat.valueOf(mba.getFormatTypeString().toUpperCase());
			return MatrixReaderFactory.createMatrixReader(f)
				.readMatrixFromHDFS(path, ds.getRows(),ds.getCols(),ds.getBlocksize(),ds.getNonZeros());
			
		}
		catch(Exception e){
			throw new RuntimeException(e);
		}
	}


	/**
	 * Reads values from a matrix file in OS's FS in R format
	 *
	 * NOTE: For reading the output of a matrix produced by a R validation code of a JUnit test, use the convenience
	 *       function {@link AutomatedTestBase#readRMatrixFromExpectedDir(String)}
	 *
	 * @param filePath Path to the file to be read.
	 * @return Matrix values in a hashmap <index,value>
	 */
	public static HashMap<CellIndex, Double> readRMatrixFromFS(String filePath)
	{
		HashMap<CellIndex, Double> expectedValues = new HashMap<>();

		try(BufferedReader reader = new BufferedReader(new FileReader(filePath)))
		{
			// skip both R header lines
			String line = reader.readLine();

			int matrixType = -1;
			if ( line.endsWith(" general") )
				matrixType = 1;
			if ( line.endsWith(" symmetric") )
				matrixType = 2;

			if ( matrixType == -1 )
				throw new RuntimeException("unknown matrix type while reading R matrix: " + line);

			line = reader.readLine(); // header line with dimension and nnz information

			while ((line = reader.readLine()) != null) {
				StringTokenizer st = new StringTokenizer(line, " ");
				int i = Integer.parseInt(st.nextToken());
				int j = Integer.parseInt(st.nextToken());
				if( st.hasMoreTokens() ) {
					double v = Double.parseDouble(st.nextToken());
					if( v==0.0 ) continue;
					expectedValues.put(new CellIndex(i, j), v);
					if ( matrixType == 2 )
						expectedValues.put(new CellIndex(j, i), v);
				}
				else { //pattern
					expectedValues.put(new CellIndex(i, j), 1.0);
					if ( matrixType == 2 )
						expectedValues.put(new CellIndex(j, i), 1.0);
				}
			}
		}
		catch (IOException e) {
			assertTrue("could not read from file " + filePath, false);
		}

		return expectedValues;
	}

	/**
	 * Reads a scalar value in DML format from HDFS
	 */
	public static HashMap<CellIndex, Double> readDMLScalarFromHDFS(String filePath) {
		HashMap<CellIndex, Double> expectedValues = new HashMap<>();
		expectedValues.put(new CellIndex(1,1), readDMLScalar(filePath));
		return expectedValues;
	}

	public static double readDMLScalar(String filePath) {
		try {
			double d=Double.NaN;
			Path outDirectory = new Path(filePath);
			FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf);
			String line;
			FileStatus[] outFiles = fs.listStatus(outDirectory);
			for (FileStatus file : outFiles) {
				FSDataInputStream fsout = fs.open(file.getPath());
				try(BufferedReader outIn = new BufferedReader(new InputStreamReader(fsout))){
					while ((line = outIn.readLine()) != null) { // only 1 scalar value in file
						d = Double.parseDouble(line);
					}
				}
			}
			return d;
		} catch (IOException e) {
			assertTrue("could not read from file " + filePath, false);
		}
		return Double.NaN;
	}

	public static boolean readDMLBoolean(String filePath) {
		try {
			boolean b = false;
			Path outDirectory = new Path(filePath);
			FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf);
			String line;
			FileStatus[] outFiles = fs.listStatus(outDirectory);
			for (FileStatus file : outFiles) {
				FSDataInputStream fsout = fs.open(file.getPath());
				try(BufferedReader outIn = new BufferedReader(new InputStreamReader(fsout))) {
					while ((line = outIn.readLine()) != null) { // only 1 scalar value in file
						b = Boolean.valueOf(Boolean.parseBoolean(line));
					}
				}
			}
			return b;
		} catch (IOException e) {
			assertTrue("could not read from file " + filePath, false);
		}
		return _AssertOccured;
	}

	public static String readDMLString(String filePath) {
		try {
			StringBuilder sb =  new StringBuilder();
			Path outDirectory = new Path(filePath);
			FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf);
			FileStatus[] outFiles = fs.listStatus(outDirectory);
			for (FileStatus file : outFiles) {
				FSDataInputStream fsout = fs.open(file.getPath());
				try(InputStreamReader is = new InputStreamReader(fsout)){
					sb.append(IOUtils.toString(is));
				}
			}
			return sb.toString();
		} catch (IOException e) {
			assertTrue("could not read from file " + filePath, false);
		}
		return null;
	}


	/**
	 * Reads a scalar value in R format from OS's FS
	 */
	public static HashMap<CellIndex, Double> readRScalarFromFS(String filePath) {
		HashMap<CellIndex, Double> expectedValues = new HashMap<>();
		expectedValues.put(new CellIndex(1,1), readRScalar(filePath));
		return expectedValues;
	}

	public static Double readRScalar(String filePath) {
		try {
			double d = Double.NaN;
			try(BufferedReader compareIn = new BufferedReader(new FileReader(filePath))) {
				String line;
				while ((line = compareIn.readLine()) != null) { // only 1 scalar value in file
					d = Double.parseDouble(line);
				}
			}
			return d;
		} catch (IOException e) {
			assertTrue("could not read from file " + filePath, false);
		}
		return Double.NaN;
	}

	public static double[][] readExpectedResource(String file, int rows, int cols) throws IOException {
		String file2 = "src/test/resources/expected/" + file;
		double[][] ret = new double[rows][cols];
		try(BufferedReader br = new BufferedReader(new FileReader(file2))) {
			String line = null;
			int i = 0;
			while((line = br.readLine()) != null) {
				String[] tmp = line.trim().split(" ");
				for (int j=0; j<tmp.length; j++)
					ret[i][j] = Double.parseDouble(tmp[j]);
				i++;
			}
		}
		return ret;
	}
	
	public static String processMultiPartCSVForR(String csvFile) throws IOException {
		File csv = new File(csvFile);
		if (csv.isDirectory()) {
			File[] parts = csv.listFiles();

			int count=0;
			int index = -1;
			for(int i=0; i < parts.length; i++ ) {
				File f = parts[i];
				String path = f.getPath();
				if (path.startsWith(".") && path.endsWith(".crc"))
					continue;
				count++;
				index = i;
			}

			if ( count == 1) {
				csvFile = parts[index].toString();
			}
			else if ( count > 1 ) {
				File tmp = new File(csvFile+"_temp.csv");

				try( OutputStreamWriter out = new OutputStreamWriter(new FileOutputStream(tmp), "UTF-8") ) {
					// Directory listing may contain .crc files or may be in the
					// wrong order. Sanitize the list of names.
					ArrayList<String> partNames = new ArrayList<>();
					for (File part : parts) {
						String partName = part.getName();
						if (false == partName.endsWith(".crc")) {
							partNames.add(partName);
						}
					}
					Collections.sort(partNames);

					for (String name : partNames) {
						File part = new File(csv, name);
						// Assume that each file fits into memory.
						String fileContents = FileUtils.readFileToString(part,
								"UTF-8");
						out.append(fileContents);
					}
				}

				csvFile = tmp.getCanonicalPath();
			}
			else {
				throw new RuntimeException("Unexpected error while reading a CSV file in R: " + count);
			}
		}
		return csvFile;
	}

	/**
	 * Compares two double values regarding tolerance t. If one or both of them
	 * is null it is converted to 0.0.
	 *
	 * @param v1
	 * @param v2
	 * @param t Tolerance
	 * @return
	 */
	public static boolean compareCellValue(Double v1, Double v2, double t, boolean ignoreNaN) {
		if (v1 == null)
			v1 = 0.0;
		if (v2 == null)
			v2 = 0.0;
		if( ignoreNaN && (v1.isNaN() || v1.isInfinite() || v2.isNaN() || v2.isInfinite()) )
			return true;
		if (v1.equals(v2))
			return true;

		if(AutomatedTestBase.TEST_GPU) {
			return Math.abs(v1 - v2) <= Math.max(t, AutomatedTestBase.GPU_TOLERANCE);
		}


		return Math.abs(v1 - v2) <= t;
	}

	public static void compareVectors(double[] expected, double[] actual, double tolerance){
		// just overload
		compareMatrices(expected, actual, tolerance);
	}

	public static void compareMatrices(double[] expectedMatrix, double[] actualMatrix, double epsilon) {
		compareMatrices(new double[][]{expectedMatrix},
			new double[][]{actualMatrix}, 1, expectedMatrix.length, epsilon);
	}


	public static void compareMatrices(double[][] expectedMatrix, double[][] actualMatrix, int rows, int cols,
		double epsilon) {
		compareMatrices(expectedMatrix, actualMatrix, expectedMatrix.length, expectedMatrix[0].length, epsilon, "");
	}

	public static void compareMatrices(double[][] expectedMatrix, double[][] actualMatrix, int rows, int cols,
			double epsilon, String message) {
		if(expectedMatrix.length != rows && expectedMatrix[0].length != cols)
			fail("Invalid number of rows and cols in expected");
		if(actualMatrix.length != rows && actualMatrix[0].length != cols)
			fail("Invalid number of rows and cols in actual");
		
		int countErrors = 0;
		for (int i = 0; i < rows && countErrors < 10; i++) {
			for (int j = 0; j < cols && countErrors < 10; j++) {
				if (!compareCellValue(expectedMatrix[i][j], actualMatrix[i][j], epsilon, true)) {
					message += ("\n Expected: " +expectedMatrix[i][j] +" vs actual: "+actualMatrix[i][j]+" at "+i+" "+j);
					countErrors++;
				}
			}
		}
		if(countErrors == 10){
			assertTrue(message+" \n More than 10 values are not equal using epsilon " + epsilon, countErrors == 0);
		}else{
			assertTrue(message+" \n" + countErrors + " values are not in equal using epsilon " + epsilon, countErrors == 0);
		}
	}

	public static void compareMatrices(double[][] expectedMatrix, double[][] actualMatrix, double epsilon){
		compareMatrices(expectedMatrix, actualMatrix, epsilon, "");
	}

	public static void compareMatrices(double[][] expectedMatrix, double[][] actualMatrix, double epsilon, String message){
		assertEqualColsAndRows(expectedMatrix,actualMatrix);
		compareMatrices(expectedMatrix, actualMatrix, expectedMatrix.length, expectedMatrix[0].length, epsilon, message);
	}

	public static void compareFrames(String[][] expectedFrame, String[][] actualFrame, int rows, int cols ) {
		int countErrors = 0;
		StringBuilder sb = new StringBuilder();
		for (int i = 0; i < rows&& countErrors < 10; i++) {
			for (int j = 0; j < cols && countErrors < 10; j++) {
				if( !( (expectedFrame[i][j]==null && actualFrame[i][j]==null) ||
					expectedFrame[i][j].equals(actualFrame[i][j]) || (expectedFrame[i][j]+".0").equals(actualFrame[i][j])) ) {
					sb.append("Expected:" + expectedFrame[i][j] +" vs actual: "+actualFrame[i][j]+" at "+i+" "+j + "\n");
					countErrors++;
				}
			}
		}
		sb.append("at least " + countErrors + " values are not equal" );
		assertTrue(sb.toString(), countErrors == 0);
	}

	public static void compareFrames(FrameBlock expected, FrameBlock actual, boolean checkMeta) {
		if(expected == null && actual == null)
			return;
		assertTrue("Expected frame was null pointer", expected != null);
		assertTrue("Actual frame was null pointer", actual != null);
		assertEquals("Number of columns and rows are not equivalent", expected.getNumRows(), actual.getNumRows());
		assertEquals("Number of columns and rows are not equivalent", expected.getNumColumns(), actual.getNumColumns());

		final int rows = expected.getNumRows();
		final int cols = expected.getNumColumns();
		if(checkMeta)
			checkMetadata(expected, actual);
		if(expected.getNumRows() == 0){
			if (expected.getColumns() != null)
				fail();
			if (actual.getColumns() != null) 
				fail();
		}
		else{
			for(int j = 0; j < cols; j++) {
				Array<?> ec = expected.getColumn(j);
				Array<?> ac = actual.getColumn(j);
				if(ec.containsNull()) {
					if(!ac.containsNull()) {
						fail("Expected both columns to be containing null if one null:\n\nExpected containing null:\n"
							+ ec.toString().substring(0, 1000) + "\n\nActual:\n" + ac.toString().substring(0, 1000));
					}
				}
				else if(ac.containsNull()) {
					fail("Expected both columns to be containing null if one null:\n\nExpected:\n"
						+ ec.toString().substring(0, 1000) + "\n\nActual containing null:\n" + ac.toString().substring(0, 1000));
				}
			}
		}

		for(int i = 0; i < rows; i++) {
			for(int j = 0; j < cols; j++) {
				final Object a = expected.get(i, j);
				final Object b = actual.get(i, j);
				if(a == null)
					assertTrue(a == b);
				else if(a != null && b != null) {
					try{
						final String as = a.toString();
						final String bs = b.toString();
						boolean equivalent = as.equals(bs) || equivalentHigherTypeFrame(as, bs);
						assertTrue("Values not equivalent at: " + i + ", " + j + "  : " + as + " vs " + bs, equivalent);
					}
					catch (Exception e ){
						fail(" Values not equivalent at: " + i + ", " + j + "  : " + a + " vs " + b);
					}
				}
			}
		}
	}

	private static boolean equivalentHigherTypeFrame(String a, String b) {
		ValueType at = FrameUtil.isType(a);
		ValueType bt = FrameUtil.isType(b);
		ValueType shared = ValueType.getHighestCommonType(at, bt);
		Object ao = ArrayFactory.parseString(a, shared);
		Object bo = ArrayFactory.parseString(b, shared);
		return ao.equals(bo);
	}

	private static void checkMetadata(FrameBlock expected, FrameBlock actual) {
		int cols = expected.getNumColumns();
		ColumnMetadata[] expectedMeta = expected.getColumnMetadata();
		ColumnMetadata[] actualMeta = actual.getColumnMetadata();

		if((expectedMeta == null && actualMeta != null) || (expectedMeta != null && actualMeta == null))
			fail("wrongly allocated metadata");
		else if(expectedMeta != null && actualMeta != null) {
			assertEquals("MetaData not correct size", expectedMeta.length, cols);
			assertEquals("MetaData not correct size", actualMeta.length, cols);
			for(int i = 0; i < cols; i++)
				if(!expectedMeta[i].equals(actualMeta[i]))
					fail("Meta data not equivalent: " + expectedMeta[i] + "  vs  " + actualMeta[i]);
		}

		String[] expectedColNames = expected.getColumnNames(false);
		String[] actualColNames = expected.getColumnNames(false);
		if((expectedColNames == null && actualColNames != null) || (expectedColNames != null && actualColNames == null))
			fail("wrongly allocated metadata");
		else if(expectedColNames != null && actualColNames != null) {
			assertEquals("Column names not correct size", expectedColNames.length, cols);
			assertEquals("Column names not correct size", actualColNames.length, cols);
			for(int i = 0; i < cols; i++) {
				assertEquals("Column names not equivalent", expectedColNames[i], actualColNames[i]);
			}
		}

	}

	public static void compareFramesAsString(FrameBlock expected, FrameBlock actual, boolean checkMeta) {
		assertEquals("Number of columns and rows are not equivalent", expected.getNumRows(), actual.getNumRows());
		assertEquals("Number of columns and rows are not equivalent", expected.getNumColumns(), actual.getNumColumns());

		int rows = expected.getNumRows();
		int cols = expected.getNumColumns();

		if(checkMeta)
			checkMetadata(expected, actual);

		for(int i = 0; i < rows; i++) {
			for(int j = 0; j < cols; j++) {
				assertEquals("Values not equivalent at: " + i + ", " + j, expected.get(i, j).toString(),
					actual.get(i, j).toString());
			}
		}
	}

	public static void compareScalars(double d1, double d2, double tol) {
		assertTrue("Given scalars do not match: " + d1 + " != " + d2 , compareCellValue(d1, d2, tol, false));
	}

	public static void compareMatricesBit(double[][] expectedMatrix, double[][] actualMatrix, int rows, int cols,
		long maxUnitsOfLeastPrecision){
		int countErrors = 0;
		for (int i = 0; i < rows; i++) {
			for (int j = 0; j < cols; j++) {
				if( !compareScalarBits(expectedMatrix[i][j], actualMatrix[i][j], maxUnitsOfLeastPrecision)){
					System.out.println("Expected: " + expectedMatrix[i][j] +" vs actual: "+actualMatrix[i][j]+" at "+i+" "+j);
					countErrors++;
				}
			}
		}
		assertTrue("" + countErrors + " values are not in equal", countErrors == 0);
	}

	public static void compareMatricesBitAvgDistance(double[][] expectedMatrix, double[][] actualMatrix,
		long maxUnitsOfLeastPrecision, long maxAvgDistance, String message) {
		final int rows = expectedMatrix.length;
		final int cols = actualMatrix[0].length;
		int countErrors = 0;
		long sumDistance = 0;

		long distance;
		for(int i = 0; i < rows && countErrors < 20; i++) {
			for(int j = 0; j < cols && countErrors < 20; j++) {
				double v1 = expectedMatrix[i][j];
				double v2 = actualMatrix[i][j];
				if(v1 == 0 && v2 == 0)
					continue;
				else if(v1 == 0 || v2 == 0) {
					if(Math.abs(v1 - v2) > 1E-16) {
						message += ("\n Expected:" + v1 + " vs actual: " + v2 + " at " + i + " " + j);
						countErrors++;
					}
				}
				else {
					distance = compareScalarBits(expectedMatrix[i][j], actualMatrix[i][j]);
					sumDistance += distance;
					if(distance > maxUnitsOfLeastPrecision) {
						message += ("\n Expected:" + v1 + " vs actual: " + v2 + " at " + i + " " + j + " Distance in bits: "
							+ distance);
						countErrors++;
					}
				}
			}
		}
		if(countErrors == 20) {
			assertTrue(message + "\n At least 20 values are not in equal", countErrors == 0);
		}
		else {
			long avgDistance = sumDistance / (rows * cols);
			assertTrue(message + "\n" + countErrors + " values are not in equal", countErrors == 0);
			assertTrue(message + "\nThe avg distance in bits: " + avgDistance + " was higher than max: " + maxAvgDistance,
				avgDistance <= maxAvgDistance);
		}
	}

	public static void compareMatricesBitAvgDistance(double[] expectedMatrix, double[] actualMatrix,
		long maxUnitsOfLeastPrecision, long maxAvgDistance, String message) {
		final int rows = expectedMatrix.length;
		int countErrors = 0;
		long sumDistance = 0;

		long distance;
		for(int i = 0; i < rows && countErrors < 20; i++) {
			double v1 = expectedMatrix[i];
			double v2 = actualMatrix[i];
			if(v1 == 0 && v2 == 0)
				continue;
			else if(v1 == 0 || v2 == 0) {
				if(Math.abs(v1 - v2) > 1E-16) {
					message += ("\n Expected:" + v1 + " vs actual: " + v2 + " at " + i);
					countErrors++;
				}
			}
			else {
				distance = compareScalarBits(expectedMatrix[i], actualMatrix[i]);
				sumDistance += distance;
				if(distance > maxUnitsOfLeastPrecision) {
					message += ("\n Expected:" + v1 + " vs actual: " + v2 + " at " + i + " Distance in bits: " + distance);
					countErrors++;
				}
			}

		}
		if(countErrors == 20) {
			assertTrue(message + "\n At least 20 values are not in equal", countErrors == 0);
		}
		else {
			long avgDistance = sumDistance / (rows);
			assertTrue(message + "\n" + countErrors + " values are not in equal", countErrors == 0);
			assertTrue(message + "\nThe avg distance in bits: " + avgDistance + " was higher than max: " + maxAvgDistance,
				avgDistance <= maxAvgDistance);
		}
	}

	public static void compareMatricesBitAvgDistance(MatrixBlock expectedMatrix, MatrixBlock actualMatrix,
		long maxUnitsOfLeastPrecision, long maxAvgDistance) {
		compareMatricesBitAvgDistance(expectedMatrix, actualMatrix, maxUnitsOfLeastPrecision, maxAvgDistance, "");
	}

	public static void compareMatricesBitAvgDistance(MatrixBlock expectedMatrix, MatrixBlock actualMatrix,
		long maxUnitsOfLeastPrecision, long maxAvgDistance, String message) {
		if(expectedMatrix instanceof CompressedMatrixBlock)
			expectedMatrix = ((CompressedMatrixBlock) expectedMatrix).decompress();
		if(actualMatrix instanceof CompressedMatrixBlock)
			actualMatrix = ((CompressedMatrixBlock) actualMatrix).decompress();
		
		if(expectedMatrix.isEmpty() && actualMatrix.isEmpty()){
			expectedMatrix.recomputeNonZeros();
			actualMatrix.recomputeNonZeros();
			if(expectedMatrix.getNonZeros() == 0 && expectedMatrix.getNonZeros() == actualMatrix.getNonZeros())
				return; // equally empty
		}
		else if(expectedMatrix.isEmpty()) {
			if(expectedMatrix.getNumRows() < 10)
				fail(message + "\nThe expected output is empty while the actual matrix is not\n" + expectedMatrix + "\n\n"
					+ "actual:" + actualMatrix);
			fail(message + "\nThe expected output is empty while the actual matrix is not");
		}
		else if(actualMatrix.isEmpty()) {
			if(expectedMatrix.getNumRows() < 10)
				fail(message + "\nThe actual output is empty while the expected matrix is not\nexpected:" + expectedMatrix + "\n\n"
					+ "actual:" + actualMatrix);
			fail(message + "\nThe actual output is empty while the expected matrix is not");
		}
		else if(expectedMatrix.isInSparseFormat() && actualMatrix.isInSparseFormat()) {
			compareMatricesBitAvgDistanceSparse(expectedMatrix.getSparseBlock(), actualMatrix.getSparseBlock(),
				maxUnitsOfLeastPrecision, maxAvgDistance, message, actualMatrix.getNumColumns());
			return;
		}
		final int rows = expectedMatrix.getNumRows();
		final int cols = actualMatrix.getNumColumns();
		int countErrors = 0;
		long sumDistance = 0;

		for(int i = 0; i < rows && countErrors < 20; i++) {
			for(int j = 0; j < cols && countErrors < 20; j++) {
				final double v1 = expectedMatrix.get(i, j);
				final double v2 = actualMatrix.get(i, j);
				if(v1 == 0 && v2 == 0)
					continue;
				else if(v1 == 0 || v2 == 0) {
					// take care of small epsilon from zero
					if(Math.abs(v1 - v2) > 1E-16) {
						message += ("\n Expected:" + v1 + " vs actual: " + v2 + " at " + i + " " + j);
						countErrors++;
					}
				}
				else {
					final long distance = compareScalarBits(v1, v2);
					sumDistance += distance;
					if(distance > maxUnitsOfLeastPrecision) {
						message += ("\n Expected:" + v1 + " vs actual: " + v2 + " at " + i + " " + j + " Distance in bits: "
							+ distance);
						countErrors++;
					}
				}
			}
		}

		if(countErrors == 20) 
			fail(message + "\n At least 20 values are not in equal");
		else {
			long avgDistance = sumDistance / (rows * cols);
			if(countErrors != 0)
				fail(message + "\n" + countErrors + " values are not in equal");
			if(avgDistance > maxAvgDistance)
				fail(message + "\nThe avg distance in bits: " + avgDistance + " was higher than max: " + maxAvgDistance);
		}
	}

	private static void compareMatricesBitAvgDistanceSparse(SparseBlock sbe, SparseBlock sba, long maxUnitsOfLeastPrecision, long maxAvgDistance, String message, int cols){
		final int rows = sbe.numRows();
		int countErrors = 0;
		long sumDistance = 0;
		for(int i = 0; i < rows && countErrors < 20; i++){
			if( sbe.isEmpty(i) !=  sba.isEmpty(i)){

				fail(message +"\nBoth matrices are not equally empty on row : " + i + " :\n" + sbe.get(i) + " vs " + sba.get(i));
			}
			
			if(sbe.isEmpty(i))
				continue;
			
			if(sba.size(i) != sbe.size(i))
				fail(message+"\nNumber of values are not equal in row: " + i +"\nactual:"+ sba.get(i) +"\nexpected:"+ sbe.get(i));

			final double[] e = sbe.values(i);
			final double[] a = sba.values(i);
			final int epos = sbe.pos(i);
			final int apos = sba.pos(i);
			final int elen = sbe.size(i) + epos;
			for(int j = apos, jj = epos; jj < elen; j++, jj++){
				final long distance = compareScalarBits(e[jj], a[j]);
				sumDistance += distance;

				if(distance > maxUnitsOfLeastPrecision) {
					message +=  ("\n Expected:" + e[jj] + " vs actual: " + a[j] + " at " + i + " " + j + " Distance in bits: " + distance);
					countErrors++;
				}
			}
		}

		if(countErrors == 20){
			fail(message + "\n At least 20 values are not in equal");
		}
		else{
			long avgDistance = sumDistance / (rows * cols);
			if(countErrors != 0)
				fail(message + "\n" + countErrors + " values are not in equal");
			if(avgDistance > maxAvgDistance)
				fail(message + "\nThe avg distance in bits: " + avgDistance + " was higher than max: " + maxAvgDistance);
		}
	}

	/**
	 * Get Percent Distance with slight cheat where if values are close to 0.
	 * @param x value 1
	 * @param y value 2
	 * @return Percent distance
	 */
	public static double getPercentDistance(double x, double y, boolean ignoreZero){
		if (Double.isNaN(x)) {
			if(Double.isNaN(y))
				return 1.0;
			else
				return 0.0;
		}
		else if(Double.isNaN(y))
			return 0.0;
		else if(Double.isInfinite(x)) {
			if(Double.isInfinite(y))
				return 1.0;
			else
				return 0.0;
		}
		else if(Double.isInfinite(y))
			return 0.0;
		else if((x < 0 && y > 0) || (x > 0 && y < 0))
			return 0.0;
		double min = Math.abs(Math.min(x, y));
		double max = Math.abs(Math.max(x, y));
		if(ignoreZero && min < 0.0001)
			return 1.0;
		else if(min < 0.0001 || max < 0.0001) {
			min += 0.0001;
			max += 0.0001;
		}

		assertFalse("Failed! because nan from division of : " + min + " / " + max, Double.isNaN(min / max));
		return min / max;
	}

	private static void assertEqualColsAndRows(double[][] expectedMatrix, double[][] actualMatrix){
		assertTrue("The number of columns in the matrixes should be equal :"
			+ expectedMatrix.length  + "  "
			+ actualMatrix.length,
			expectedMatrix.length == actualMatrix.length);
		assertTrue("The number of rows in the matrixes should be equal"
			+ expectedMatrix[0].length  + "  "
			+ actualMatrix[0].length,
			expectedMatrix[0].length == actualMatrix[0].length);
	}

	public static void assertEqualColsAndRows(MatrixBlock expectedMatrix, MatrixBlock actualMatrix) {
		final int er = expectedMatrix.getNumRows();
		final int ec = expectedMatrix.getNumColumns();
		final int ar = actualMatrix.getNumRows();
		final int ac = actualMatrix.getNumColumns();
		if(er != ar || ec != ac)
			fail("The number of rows and columns does not match in matrices expected: " + er + " " + ec + " actual: " + ar + " "+ ac);
	}

	public static void assertEqualColsAndRows(MatrixBlock expectedMatrix, MatrixBlock actualMatrix, String message) {
		final int er = expectedMatrix.getNumRows();
		final int ec = expectedMatrix.getNumColumns();
		final int ar = actualMatrix.getNumRows();
		final int ac = actualMatrix.getNumColumns();
		if(er != ar || ec != ac)
			fail(message + "\nThe number of rows and columns does not match in matrices");
	}

	public static void compareMatricesPercentageDistance(double[][] expectedMatrix, double[][] actualMatrix,
			double percentDistanceAllowed, double maxAveragePercentDistance,  String message){
		assertEqualColsAndRows(expectedMatrix,actualMatrix);
		compareMatricesPercentageDistance(expectedMatrix, actualMatrix, expectedMatrix.length, expectedMatrix[0].length,
			percentDistanceAllowed, maxAveragePercentDistance, message, false);
	}

	public static void compareMatricesPercentageDistance(MatrixBlock expectedMatrix, MatrixBlock actualMatrix,
		double percentDistanceAllowed, double maxAveragePercentDistance, String message) {
		assertEqualColsAndRows(expectedMatrix, actualMatrix,message);
		compareMatricesPercentageDistance(expectedMatrix, actualMatrix, percentDistanceAllowed, maxAveragePercentDistance,
			message, false);
	}

	public static void compareMatricesPercentageDistance(double[][] expectedMatrix, double[][] actualMatrix,
			double percentDistanceAllowed, double maxAveragePercentDistance,  String message, boolean ignoreZero){
		assertEqualColsAndRows(expectedMatrix,actualMatrix);
		compareMatricesPercentageDistance(expectedMatrix, actualMatrix, expectedMatrix.length, expectedMatrix[0].length,
			percentDistanceAllowed, maxAveragePercentDistance, message, ignoreZero);
	}

	public static void compareMatricesPercentageDistance(double[][] expectedMatrix, double[][] actualMatrix, int rows,
		int cols, double percentDistanceAllowed, double maxAveragePercentDistance, String message, boolean ignoreZero) {
		assertTrue("percentDistanceAllowed should be between 1 and 0",
			percentDistanceAllowed >= 0.0 && percentDistanceAllowed <= 1.0);
		assertTrue("maxAveragePercentDistance should be between 1 and 0",
			maxAveragePercentDistance >= 0.0 && maxAveragePercentDistance <= 1.0);

		int countErrors = 0;
		double sumPercentDistance = 0;
		double distance;

		for(int i = 0; i < rows && countErrors < 20; i++) {
			for(int j = 0; j < cols && countErrors < 20; j++) {
				distance = getPercentDistance(expectedMatrix[i][j], actualMatrix[i][j], ignoreZero);
				sumPercentDistance += distance;
				if(distance < percentDistanceAllowed) {
					message += ("\nExpected: " + expectedMatrix[i][j] + " vs actual: " + actualMatrix[i][j] + " at " + i
						+ " " + j + " Distance in percent " + distance);
					countErrors++;
				}
			}
		}
		if(countErrors == 20) {
			assertTrue(message + "\n At least 20 values are not in equal", countErrors == 0);
		}
		else {
			double avgDistance = sumPercentDistance / (rows * cols);
			assertTrue(message + "\n" + countErrors + " values are not in equal of total: " + (rows * cols),
				countErrors == 0);
			assertTrue(
				message + "\nThe avg distance: " + avgDistance + " was lower than threshold " + maxAveragePercentDistance,
				avgDistance > maxAveragePercentDistance);
		}
	}

	public static void compareMatricesPercentageDistance(double[] expectedMatrix, double[] actualMatrix,
		double percentDistanceAllowed, double maxAveragePercentDistance, String message, boolean ignoreZero) {
		assertTrue("percentDistanceAllowed should be between 1 and 0",
			percentDistanceAllowed >= 0.0 && percentDistanceAllowed <= 1.0);
		assertTrue("maxAveragePercentDistance should be between 1 and 0",
			maxAveragePercentDistance >= 0.0 && maxAveragePercentDistance <= 1.0);

		int countErrors = 0;
		double sumPercentDistance = 0;
		double distance;
		int rows = expectedMatrix.length;
		for(int i = 0; i < rows && countErrors < 20; i++) {
			distance = getPercentDistance(expectedMatrix[i], actualMatrix[i], ignoreZero);
			sumPercentDistance += distance;
			if(distance < percentDistanceAllowed) {
				message += ("\nExpected: " + expectedMatrix[i] + " vs actual: " + actualMatrix[i] + " at " + i
					+ " Distance in percent " + distance);
				countErrors++;
			}
		}

		if(countErrors == 20) {
			assertTrue(message + "\n At least 20 values are not in equal", countErrors == 0);
		}
		else {
			double avgDistance = sumPercentDistance / (rows);
			assertTrue(message + "\n" + countErrors + " values are not in equal of total: " + (rows), countErrors == 0);
			assertTrue(
				message + "\nThe avg distance: " + avgDistance + " was lower than threshold " + maxAveragePercentDistance,
				avgDistance > maxAveragePercentDistance);
		}
	}

	public static void compareMatricesPercentageDistance(MatrixBlock expectedMatrix, MatrixBlock actualMatrix,
		double percentDistanceAllowed, double maxAveragePercentDistance, String message, boolean ignoreZero) {
		assertTrue("percentDistanceAllowed should be between 1 and 0",
			percentDistanceAllowed >= 0.0 && percentDistanceAllowed <= 1.0);
		assertTrue("maxAveragePercentDistance should be between 1 and 0",
			maxAveragePercentDistance >= 0.0 && maxAveragePercentDistance <= 1.0);

		if(expectedMatrix.isEmpty() && actualMatrix.isEmpty()) {
			expectedMatrix.recomputeNonZeros();
			actualMatrix.recomputeNonZeros();
			if(expectedMatrix.getNonZeros() == 0 && expectedMatrix.getNonZeros() == actualMatrix.getNonZeros())
				return; // equally empty
		}
		if(expectedMatrix.isInSparseFormat() && actualMatrix.isInSparseFormat()) {
			compareMatricesPercentageDistanceSparse(expectedMatrix.getSparseBlock(), actualMatrix.getSparseBlock(),
				percentDistanceAllowed, maxAveragePercentDistance, message, ignoreZero, actualMatrix.getNumColumns());
			return;
		}

		final int rows = expectedMatrix.getNumRows();
		final int cols = expectedMatrix.getNumColumns();
		int countErrors = 0;
		double sumPercentDistance = 0;

		for(int i = 0; i < rows && countErrors < 20; i++) {
			for(int j = 0; j < cols && countErrors < 20; j++) {
				final double distance = getPercentDistance(expectedMatrix.get(i, j),
					actualMatrix.get(i, j), ignoreZero);
				sumPercentDistance += distance;
				if(distance < percentDistanceAllowed) {
					message += ("\nExpected: " + expectedMatrix.get(i, j) + " vs actual: "
						+ actualMatrix.get(i, j) + " at " + i + " " + j + " Distance in percent " + distance);
					countErrors++;
				}
			}
		}
		if(countErrors == 20) {
			fail(message + "\n At least 20 values are not in equal");
		}
		else {
			double avgDistance = sumPercentDistance / (rows * cols);
			if(countErrors != 0)
				fail(message + "\n" + countErrors + " values are not in equal of total: " + (rows * cols));
			if(avgDistance < maxAveragePercentDistance)
				fail(message + "\nThe avg distance: " + avgDistance + " was lower than threshold "
					+ maxAveragePercentDistance);
		}
	}

	private static void compareMatricesPercentageDistanceSparse(SparseBlock sbe, SparseBlock sba,
		double percentDistanceAllowed, double maxAveragePercentDistance, String message, boolean ignoreZero, int cols) {
		final int rows = sbe.numRows();
		int countErrors = 0;
		double sumPercentDistance = 0;

		for(int i = 0; i < rows && countErrors < 20; i++){

			if( sbe.isEmpty(i) !=  sba.isEmpty(i))
				fail(message +"\nBoth matrices are not equally empty on row : " + i);
			
			if(sbe.isEmpty(i))
				continue;
			
			final double[] e = sbe.values(i);
			final double[] a = sba.values(i);
			final int epos = sbe.pos(i);
			final int apos = sba.pos(i);
			final int elen = sbe.size(i) + epos;
			if(sba.size(i) != sbe.size(i)){
				String err  = message + "\nNumber of values are not equal in row: " + i + "  actual: " + sba.size(i) + " expected: " + sbe.size(i);
				for(int j = epos ; j < elen; j++)
					err+= " " +e[j];
				for(int j = apos ; j < sba.size(i) + apos; j++)
					err+= " " + a[j];
				fail(err);
			}

			for(int j = apos, jj = epos; jj < elen; j++, jj++){
				final double distance = getPercentDistance(e[jj], a[j], ignoreZero);
				sumPercentDistance += distance;
				if(distance < percentDistanceAllowed) {
					message += ("\nExpected: " + e[jj] + " vs actual: " +  a[j] + " at " + i
						+ " " + j + " Distance in percent " + distance);
					countErrors++;
				}
			}
		}

		if(countErrors == 20){
			fail(message + "\n At least 20 values are not in equal");
		}
		else{
			double avgDistance = sumPercentDistance / (rows * cols);
			if(countErrors != 0)
				fail(message + "\n" + countErrors + " values are not in equal");
			if(avgDistance > maxAveragePercentDistance)
				fail(message + "\nThe avg distance in percent: " + avgDistance + " was higher than max: " + maxAveragePercentDistance);
		}
	}

	public static void compareMatricesAvgRowDistance(double[][] expectedMatrix, double[][] actualMatrix, int rows,
		int cols, double averageDistanceAllowed){
			String message = "";
			int countErrors = 0;

			for (int i = 0; i < rows && countErrors < 20; i++) {
				double distanceSum = 0;
				for (int j = 0; j < cols && countErrors < 20; j++) {
					distanceSum += expectedMatrix[i][j] - actualMatrix[i][j];
				}
				if(distanceSum / cols > averageDistanceAllowed){
					message += ("Average distance for row " + i + ":" + (distanceSum / cols) + "\n");
					countErrors++;
				}
			}
			if(countErrors == 20){
				assertTrue(message + "\n At least 20 values are not in equal", countErrors == 0);
			}
			else{
				assertTrue(message + "\n" + countErrors + " values are not in equal of total: " + (rows), countErrors == 0);
			}
	}

	public static void compareMatricesBitAvgDistance(double[][] expectedMatrix, double[][] actualMatrix, int rows,
		int cols, long maxUnitsOfLeastPrecision, long maxAvgDistance) {
			compareMatricesBitAvgDistance(expectedMatrix, actualMatrix, maxUnitsOfLeastPrecision, maxAvgDistance, "");
	}

	/**
	 * Compare two double precision floats for equality within a margin of error.
	 *
	 * This can be used to compensate for inequality caused by accumulated
	 * floating point math errors.
	 *
	 * The error margin is specified in ULPs (units of least precision).
	 * A one-ULP difference means there are no representable floats in between.
	 * E.g. 0f and 1.4e-45f are one ULP apart. So are -6.1340704f and -6.13407f.
	 * Depending on the number of calculations involved, typically a margin of
	 * 1-5 ULPs should be enough.
	 *
	 * @param d1 The expected value.
	 * @param d2 The actual value.
	 * @return Whether distance in bits
	 */
	public static long compareScalarBits(double d1, double d2) {
		if(d1 > 0.1 && d2 < 0.1 || d1 < 0.1 && d2 > 0.1)
			return Long.MAX_VALUE;

		long expectedBits = Double.doubleToLongBits(d1) < 0 ? 0x8000000000000000L - Double.doubleToLongBits(d1) : Double.doubleToLongBits(d1);
		long actualBits = Double.doubleToLongBits(d2) < 0 ? 0x8000000000000000L - Double.doubleToLongBits(d2) : Double.doubleToLongBits(d2);
		long difference = expectedBits > actualBits ? expectedBits - actualBits : actualBits - expectedBits;
		return difference;
	}

	public static boolean compareScalarBits(double d1, double d2, long maxUnitsOfLeastPrecision) {
		if (Double.isNaN(d1) || Double.isNaN(d2))
			return false;

		// assertTrue("Both values should be positive or negative",(d1 >= 0 && d2 >= 0) || (d2 <= 0 && d1 <= 0));
		long expectedBits = Double.doubleToLongBits(d1) < 0 ? 0x8000000000000000L - Double.doubleToLongBits(d1) : Double.doubleToLongBits(d1);
		long actualBits = Double.doubleToLongBits(d2) < 0 ? 0x8000000000000000L - Double.doubleToLongBits(d2) : Double.doubleToLongBits(d2);
		long difference = expectedBits > actualBits ? expectedBits - actualBits : actualBits - expectedBits;
		return difference <= maxUnitsOfLeastPrecision;
	}

	public static void compareScalarBitsJUnit(double d1, double d2, long maxUnitsOfLeastPrecision){
		compareScalarBitsJUnit(d1,d2,maxUnitsOfLeastPrecision, null);
	}

	public static void compareScalarBitsJUnit(double d1, double d2, long maxUnitsOfLeastPrecision, String errorMessage){
		long distance = compareScalarBits(d1,d2);
		boolean equal = distance <= maxUnitsOfLeastPrecision;
		if(!equal){
			String message = "Given scalars do not match: " + d1 + " != " + d2 + " with bitDistance: " + distance;
			if(errorMessage != null)
				message = errorMessage + "\n" + message;
			fail(message);
		}
	}

	public static void compareScalars(String expected, String actual) {
			assertEquals(expected, actual);
	}
	
	public static void compareScalars(Boolean expected, Boolean actual) {
			assertEquals(expected, actual);
	}

	public static boolean compareMatrices(HashMap<CellIndex, Double> m1, HashMap<CellIndex, Double> m2,
			double tolerance, String name1, String name2)
	{
		return compareMatrices(m1, m2, tolerance, name1, name2, false);
	}

	public static void compareMatrices(HashMap<CellIndex, Double> m1, MatrixBlock m2, double tolerance) {
		double[][] ret1 = convertHashMapToDoubleArray(m1);
		double[][] ret2 = DataConverter.convertToDoubleMatrix(m2);
		compareMatrices(ret1, ret2, m2.getNumRows(), m2.getNumColumns(), tolerance);
	}

	public static void compareMatrices(MatrixBlock m1, MatrixBlock m2, double tolerance) {
		compareMatrices(m1, m2, tolerance, null);
	}

	/**
	 * compare and error out on differences above tolerance
	 * 
	 * @param m1        expected matrix
	 * @param m2        actual matrix
	 * @param tolerance tolerance
	 * @param message   error message
	 */
	public static void compareMatrices(MatrixBlock m1, MatrixBlock m2, double tolerance, String message) {
		if(m1.getNumRows() != m2.getNumRows() || m1.getNumColumns() != m2.getNumColumns())
			fail("Matrices are different sizes " + m1.getNumRows() + "," + m1.getNumColumns() + " vs " + m2.getNumRows()
				+ "," + m2.getNumColumns());
		double[][] ret1 = DataConverter.convertToDoubleMatrix(m1);
		double[][] ret2 = DataConverter.convertToDoubleMatrix(m2);
		compareMatrices(ret1, ret2, m2.getNumRows(), m2.getNumColumns(), tolerance, message);
	}

	public static void compareMatrices(MatrixBlock m1, double[][] m2, double tolerance, String message) {
		double[][] ret1 = DataConverter.convertToDoubleMatrix(m1);
		compareMatrices(ret1, m2, m1.getNumRows(), m1.getNumColumns(), tolerance, message);
	}

	public static void compareMatrices(double[][] m1, MatrixBlock m2, double tolerance, String message) {
		double[][] ret2 = DataConverter.convertToDoubleMatrix(m2);
		compareMatrices(m1, ret2, m2.getNumRows(), m2.getNumColumns(), tolerance, message);
	}

	/**
	 * Compares two matrices given as HashMaps. The matrix containing more nnz
	 * is iterated and each cell value compared against the corresponding cell
	 * in the smaller matrix, to ensure that all values are compared.<br/>
	 * This method does not assert. Instead statistics are added to
	 * AssertionBuffer, at the end of the test you should call
	 * {@link TestUtils#displayAssertionBuffer()}.
	 *
	 * @param m1
	 * @param m2
	 * @param tolerance
	 * @return True if matrices are identical regarding tolerance.
	 */
	public static boolean compareMatrices(HashMap<CellIndex, Double> m1, HashMap<CellIndex, Double> m2,
			double tolerance, String name1, String name2, boolean ignoreNaN) {
		HashMap<CellIndex, Double> first = m2;
		HashMap<CellIndex, Double> second = m1;
		String namefirst = name2;
		String namesecond = name1;
		boolean flag = true;

		// to ensure that always the matrix with more nnz is iterated
		if (m1.size() > m2.size()) {
			first = m1;
			second = m2;
			namefirst = name1;
			namesecond = name2;
			flag=false;
		}

		int countErrorWithinTolerance = 0;
		int countIdentical = 0;
		double minerr = Double.MAX_VALUE;
		double maxerr = -Double.MAX_VALUE;
		
		for (Entry<CellIndex, Double> e : first.entrySet()) {
			Double v1 = e.getValue() == null ? 0.0 : e.getValue();
			Double v2 = second.get(e.getKey());
			v2 = v2 == null ? 0.0 : v2;
			minerr = Math.min(minerr, Math.abs(v1 - v2));
			maxerr = Math.max(maxerr, Math.abs(v1 - v2));

			if (!compareCellValue(v1, v2, 0, ignoreNaN)) {
				if (!compareCellValue(v1, v2, tolerance, ignoreNaN)) {
					countErrorWithinTolerance++;
					if(countErrorWithinTolerance < 10){
						if(!flag)
							LOG.error(e.getKey() + ": " + v1 + " <--> " + v2);
						else
							LOG.error(e.getKey() + ": " + v2 + " <--> " + v1);
					}

				}
			} else {
				countIdentical++;
			}
		}

		String assertPrefix = (countErrorWithinTolerance == 0) ? "    " : "!  ";
		_AssertInfos.add(assertPrefix + name1 + "<->" + name2 + " # stored values in " + namefirst + ": "
				+ first.size());
		_AssertInfos.add(assertPrefix + name1 + "<->" + name2 + " # stored values in " + namesecond + ": "
				+ second.size());
		_AssertInfos.add(assertPrefix + name1 + "<->" + name2 + " identical values(z=0): " + countIdentical);
		_AssertInfos.add(assertPrefix + name1 + "<->" + name2 + " wrong values(z=" + tolerance + "): "
				+ countErrorWithinTolerance);
		_AssertInfos.add(assertPrefix + name1 + "<->" + name2 + " min error: " + minerr);
		_AssertInfos.add(assertPrefix + name1 + "<->" + name2 + " max error: " + maxerr);

		if (countErrorWithinTolerance == 0)
			return true;

		_AssertOccured = true;
		return false;
	}

	public static int compareTo(ValueType vt, Object in1, Object in2, double tolerance) {
		if(in1 == null && in2 == null) return 0;
		else if(in1 == null) return -1;
		else if(in2 == null) return 1;

		switch( vt ) {
			case STRING:  return ((String)in1).compareTo((String)in2);
			case BOOLEAN: return ((Boolean)in1).compareTo((Boolean)in2);
			case INT64:     return ((Long)in1).compareTo((Long)in2);
			case FP64:
				return (Math.abs((Double)in1-(Double)in2) < tolerance)?0:
					((Double)in1).compareTo((Double)in2);
			default: throw new RuntimeException("Unsupported value type: "+vt);
		}
	}

	public static int compareToR(ValueType vt, Object in1, Object inR, double tolerance) {
		if(in1 == null && (inR == null || (inR.toString().compareTo("NA")==0))) return 0;
		else if(in1 == null && vt == ValueType.STRING) return -1;
		else if(inR == null) return 1;
		switch( vt ) {
			case STRING: 
				if (in1 == null)
					throw new DMLRuntimeException("Fail");
			 	return ((String)in1).compareTo((String)inR);
			case BOOLEAN:
				if(in1 == null)
					return Boolean.FALSE.compareTo(((Boolean)inR).booleanValue());
				else
					return ((Boolean)in1).compareTo((Boolean)inR);
			case INT64:
				if(in1 == null)
					return Long.valueOf(0).compareTo(((Long)inR));
				else
					return ((Long)in1).compareTo((Long)inR);
			case FP64:
				if(in1 == null)
					return Double.valueOf(0).compareTo((Double)inR);
				else
					return (Math.abs((Double)in1-(Double)inR) < tolerance)?0:
						((Double)in1).compareTo((Double)inR);
			default: throw new RuntimeException("Unsupported value type: "+vt);
		}
	}

	public static HashMap<CellIndex, Double> convert2DDoubleArrayToHashMap(double[][] matrix) {
		HashMap<CellIndex, Double> hmMatrix = new HashMap<>();
		for (int i = 0; i < matrix.length; i++) {
			for (int j = 0; j < matrix[i].length; j++) {
				if (matrix[i][j] != 0)
					hmMatrix.put(new CellIndex(i + 1, j + 1), matrix[i][j]);
			}
		}

		return hmMatrix;
	}

	public static double[][] convertHashMapToDoubleArray(HashMap <CellIndex, Double> matrix) {
		int max_rows = -1, max_cols= -1;
		for(CellIndex ix : matrix.keySet()) {
			max_rows = Math.max(max_rows, ix.row);
			max_cols = Math.max(max_cols, ix.column);
		}
		return convertHashMapToDoubleArray(matrix, max_rows, max_cols);
	}

	public static double[][] convertHashMapToDoubleArray(HashMap<CellIndex, Double> matrix, int rows, int cols) {
		double [][] ret_arr = new double[rows][cols];
		for(Entry<CellIndex, Double> e : matrix.entrySet()) {
			int i = e.getKey().row-1;
			int j = e.getKey().column-1;
			ret_arr[i][j] = e.getValue();
		}
		return ret_arr;
	}

	public static double[] convert2Dto1DDoubleArray(double[][] array) {
		double[] ret = new double[array.length * array[0].length];
		int c = 0;
		for (int i = 0; i < array.length; i++) {
			for (int j = 0; j < array[0].length; j++) {
				ret[c++] = array[i][j];
			}
		}

		return ret;
	}

	/**
	 * Converts a 1D double array into a 2D double array.
	 *
	 * @param array
	 * @return
	 */
	public static double[][] convert1Dto2DDoubleArray(double[] array, int rows) {
		int cols = array.length / rows;
		double[][] ret = new double[rows][cols];

		for (int c = 0; c < array.length; c++) {
			ret[c % cols][c / cols] = array[c];
		}

		return ret;
	}

	/**
	 * Asserts the content of assertion buffer, which may contain of all methods
	 * which assert not themselves but add information to that buffer.
	 */
	public static void displayAssertionBuffer() {
		String msg = "Detailed matrices characteristics:\n";
		for (String cur : _AssertInfos) {
			msg += cur + "\n";
		}

		assertTrue(msg, !_AssertOccured);
	}

	/**
	 * <p>
	 * Compares a dml matrix file in HDFS with a file in normal file system
	 * generated by R
	 * </p>
	 *
	 * @param rFile
	 *            file with values calculated by R
	 * @param hdfsDir
	 *            file with actual values calculated by DML
	 * @param epsilon
	 *            tolerance for value comparison
	 */
	public static void compareDMLHDFSFileWithRFile(String rFile, String hdfsDir, double epsilon) {
		try {
			Path outDirectory = new Path(hdfsDir);
			FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf);
			HashMap<CellIndex, Double> expectedValues = new HashMap<>();
			HashMap<CellIndex, Double> actualValues = new HashMap<>();
			try(BufferedReader compareIn = new BufferedReader(new FileReader(rFile))) {
				// skip both R header lines
				compareIn.readLine();
				compareIn.readLine();
				readValuesFromFileStreamAndPut(compareIn, expectedValues);
			}

			FileStatus[] outFiles = fs.listStatus(outDirectory);

			for (FileStatus file : outFiles) {
				FSDataInputStream fsout = fs.open(file.getPath());
				readValuesFromFileStream(fsout, actualValues);
			}
			Set<CellIndex> allKeys = new HashSet<>();
			allKeys.addAll(expectedValues.keySet());
			if(expectedValues.size() != actualValues.size())
				allKeys.addAll(actualValues.keySet());

			int countErrors = 0;
			for (CellIndex index : allKeys) {
				Double expectedValue = expectedValues.get(index);
				Double actualValue = actualValues.get(index);
				if (expectedValue == null)
					expectedValue = 0.0;
				if (actualValue == null)
					actualValue = 0.0;

				if (!compareCellValue(expectedValue, actualValue, epsilon, false))
					countErrors++;
			}
			assertTrue("for file " + hdfsDir + " " + countErrors + " values are not in equal", countErrors == 0);
		} catch (IOException e) {
			fail("unable to read file: " + e.getMessage());
		}
	}

	/**
	 * <p>
	 * Checks a matrix against a number of specifications.
	 * </p>
	 *
	 * @param data
	 *            matrix data
	 * @param mc
	 *            matrix characteristics
	 * @param rows
	 *            number of rows
	 * @param cols
	 *            number of columns
	 * @param min
	 *            minimum value
	 * @param max
	 *            maximum value
	 */
	public static void checkMatrix(double[][] data, MatrixCharacteristics mc, long rows, long cols, double min, double max) {
		assertEquals(rows, mc.getRows());
		assertEquals(cols, mc.getCols());
		for (int i = 0; i < rows; i++) {
			for (int j = 0; j < cols; j++) {
				assertTrue("invalid value",
						((data[i][j] >= min && data[i][j] <= max) || data[i][j] == 0));
			}
		}
	}

	/**
	 * <p>
	 * Checks a matrix read from a file in text format against a number of
	 * specifications.
	 * </p>
	 *
	 * @param outDir
	 *            directory containing the matrix
	 * @param rows
	 *            number of rows
	 * @param cols
	 *            number of columns
	 * @param min
	 *            minimum value
	 * @param max
	 *            maximum value
	 */
	public static void checkMatrix(String outDir, long rows, long cols, double min, double max) {
		try {
			Path outDirectory = new Path(outDir);
			FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf);
			assertTrue(outDir + " does not exist", fs.exists(outDirectory));

			if( fs.getFileStatus(outDirectory).isDirectory() )
			{
				FileStatus[] outFiles = fs.listStatus(outDirectory);
				for (FileStatus file : outFiles) {
					FSDataInputStream fsout = fs.open(file.getPath());
					try( BufferedReader outIn = new BufferedReader(new InputStreamReader(fsout)) ){
						String line;
						while ((line = outIn.readLine()) != null) {
							String[] rcv = line.split(" ");
							long row = Long.parseLong(rcv[0]);
							long col = Long.parseLong(rcv[1]);
							double value = Double.parseDouble(rcv[2]);
							assertTrue("invalid row index", (row > 0 && row <= rows));
							assertTrue("invlaid column index", (col > 0 && col <= cols));
							assertTrue("invalid value", ((value >= min && value <= max) || value == 0));
						}
					}
				}
			}
			else
			{
				FSDataInputStream fsout = fs.open(outDirectory);
				try(BufferedReader outIn = new BufferedReader(new InputStreamReader(fsout))) {
					String line;
					while ((line = outIn.readLine()) != null) {
						String[] rcv = line.split(" ");
						long row = Long.parseLong(rcv[0]);
						long col = Long.parseLong(rcv[1]);
						double value = Double.parseDouble(rcv[2]);
						assertTrue("invalid row index", (row > 0 && row <= rows));
						assertTrue("invlaid column index", (col > 0 && col <= cols));
						assertTrue("invalid value", ((value >= min && value <= max) || value == 0));
					}
				}
			}
		} catch (IOException e) {
			fail("unable to read file: " + e.getMessage());
		}
	}

	/**
	 * <p>
	 * Checks for matrix in directory existence.
	 * </p>
	 *
	 * @param outDir
	 *            directory
	 */
	public static void checkForOutputExistence(String outDir) {
		try {
			Path outDirectory = new Path(outDir);
			FileSystem fs = IOUtilFunctions.getFileSystem(outDirectory, conf);
			FileStatus[] outFiles = fs.listStatus(outDirectory);
			assertEquals("number of files in directory not 1", 1, outFiles.length);
			FSDataInputStream fsout = fs.open(outFiles[0].getPath());
			String outLine = null;
			try(BufferedReader outIn = new BufferedReader(new InputStreamReader(fsout))) {
				outLine = outIn.readLine();
			}
			assertNotNull("file is empty", outLine);
			assertTrue("file is empty", outLine.length() > 0);
		} catch (IOException e) {
			fail("unable to read " + outDir + ": " + e.getMessage());
		}
	}

	/**
	 * <p>
	 * Removes all the directories specified in the array in HDFS
	 * </p>
	 *
	 * @param directories
	 *            directories array
	 */
	public static void removeHDFSDirectories(String[] directories) {
		try {
			for (String directory : directories) {
				Path dir = new Path(directory);
				FileSystem fs = IOUtilFunctions.getFileSystem(dir, conf);
				if (fs.exists(dir) && fs.getFileStatus(dir).isDirectory()) {
					fs.delete(dir, true);
				}
			}
		} catch (IOException e) {
		}
	}

	/**
	 * <p>
	 * Removes all the directories specified in the array in OS filesystem
	 * </p>
	 *
	 * @param directories
	 *            directories array
	 */
	public static void removeDirectories(String[] directories) {
		for (String directory : directories) {
			File dir = new File(directory);
			deleteDirectory(dir);
		}
	}

	private static boolean deleteDirectory(File path) {
		if (path.exists()) {
			File[] files = path.listFiles();
			for (int i = 0; i < files.length; i++) {
				if (files[i].isDirectory()) {
					deleteDirectory(files[i]);
				} else {
					files[i].delete();
				}
			}
		}
		return (path.delete());
	}

	/**
	 * <p>
	 * Removes all the files specified in the array in HDFS
	 * </p>
	 *
	 * @param files
	 *            files array
	 */
	public static void removeHDFSFiles(String[] files) {
		try {
			for (String directory : files) {
				Path dir = new Path(directory);
				FileSystem fs = IOUtilFunctions.getFileSystem(dir, conf);
				if (fs.exists(dir) && !fs.getFileStatus(dir).isDirectory()) {
					fs.delete(dir, false);
				}
			}
		} catch (IOException e) {
		}
	}

	/**
	 * <p>
	 * Removes all the files specified in the array in OS filesystem
	 * </p>
	 *
	 * @param files
	 *            files array
	 */
	public static void removeFiles(String[] files) {
		for (String directory : files) {
			File f = new File(directory);
			if (!f.exists() || !f.canWrite() || f.isDirectory())
				continue;

			f.delete();
		}
	}

	/**
	 * <p>
	 * Clears a complete directory.
	 * </p>
	 *
	 * @param directory
	 *            directory
	 */
	public static void clearDirectory(String directory) {
		try {
			Path path = new Path(directory);
			FileSystem fs = IOUtilFunctions.getFileSystem(path, conf);
			FileStatus[] directoryContent = fs.listStatus(path);
			for (FileStatus content : directoryContent) {
				fs.delete(content.getPath(), true);
			}
		} catch (IOException e) {
		}
	}

	/**
	 * <p>
	 * Generates a test matrix with the specified parameters as a two
	 * dimensional array.
	 * </p>
	 * <p>
	 * Set seed to -1 to use the current time as seed.
	 * </p>
	 *
	 * @param rows
	 *            number of rows
	 * @param cols
	 *            number of columns
	 * @param min
	 *            minimum value
	 * @param max
	 *            maximum value
	 * @param sparsity
	 *            sparsity
	 * @param seed
	 *            seed
	 * @return random matrix
	 */
	public static double[][] generateTestMatrix(int rows, int cols, double min, double max, double sparsity, long seed) {
		double[][] matrix = new double[rows][cols];
		Random random = (seed == -1) ? TestUtils.random : new Random(seed);
		for (int i = 0; i < rows; i++) {
			for (int j = 0; j < cols; j++) {
				if (random.nextDouble() > sparsity)
					continue;
				matrix[i][j] = (random.nextDouble() * (max - min) + min);
			}
		}

		return matrix;
	}

	public static double[] generateTestVector(int cols, double min, double max, double sparsity, long seed) {
		double[] vector = new double[cols];
		Random random = (seed == -1) ? TestUtils.random : new Random(seed);
		for(int j = 0; j < cols; j++) {
			if(random.nextDouble() > sparsity)
				continue;
			vector[j] = (random.nextDouble() * (max - min) + min);
		}
		return vector;
	}

	/**
	 *
	 * Generates a test matrix with the specified parameters as a MatrixBlock.
	 *
	 * @param rows number of rows
	 * @param cols number of columns
	 * @param min minimum value
	 * @param max maximum value
	 * @param sparsity sparsity
	 * @param seed seed
	 * @return random MatrixBlock
	 */
	public static MatrixBlock generateTestMatrixBlock(int rows, int cols, double min, double max, double sparsity, long seed){
		return MatrixBlock.randOperations(rows, cols, sparsity, min, max, "Uniform", seed);
	}

	/**
	 *
	 * Generates a symmetric test matrix with the specified parameters as a MatrixBlock.
	 * TODO: generate the values without using the randOperation
	 *
	 * @param rows number of rows
	 * @param cols number of columns
	 * @param min minimum value
	 * @param max maximum value
	 * @param sparsity sparsity
	 * @param seed seed
	 * @return random symmetric MatrixBlock
	 */
	public static MatrixBlock generateTestMatrixBlockSym(int rows, int cols, double min, double max, double sparsity, long seed){
		MatrixBlock m = MatrixBlock.randOperations(rows, cols, sparsity, min, max, "Uniform", seed);
		for(int i = 0; i < rows; i++) {
			for(int j = i+1; j < cols; j++) {
				m.set(i,j, m.get(j,i));
			}
		}
		return m;
	}

	/**
	 * Generates a test matrix with the specified parameters as a two
	 * dimensional array.
	 * Set seed to -1 to use the current time as seed.
	 *
	 * @param rows number of rows
	 * @param cols number of columns
	 * @param min minimum value
	 * @param max maximum value
	 * @param sparsity sparsity
	 * @param seed seed
	 * @param delta The minimum delta between values.
	 * @return random matrix
	 */
	public static double[][] generateTestMatrix(int rows, int cols, double min, double max, double sparsity, long seed, double delta) {
		double[][] matrix = new double[rows][cols];
		Random random = (seed == -1) ? TestUtils.random : new Random(seed);
		for (int i = 0; i < rows; i++) {
			for (int j = 0; j < cols; j++) {
				if (random.nextDouble() > sparsity)
					continue;
				double v = (random.nextDouble()) * (max - min);
				matrix[i][j] = (v + min ) - v % delta;
				matrix[i][j] = Math.round(matrix[i][j]  * 100.0) / 100.0;
			}
		}

		return matrix;
	}

	/**
	 *
	 * Generates a test matrix, but only containing real numbers, in the range specified.
	 *
	 * @param rows number of rows
	 * @param cols number of columns
	 * @param min minimum value whole number
	 * @param max maximum value whole number
	 * @param sparsity sparsity
	 * @param seed seed
	 * @return random matrix containing whole numbers in the range specified.
	 */
	public static int[][] generateTestMatrixIntV(int rows, int cols, int min, int max, double sparsity, long seed) {
		int[][] matrix = new int[rows][cols];
		Random random = (seed == -1) ? TestUtils.random : new Random(seed);
		if (max - min != 0){
			for (int i = 0; i < rows; i++) {
				for (int j = 0; j < cols; j++) {
					if (random.nextDouble() > sparsity)
						continue;
					matrix[i][j] = (random.nextInt((max - min)) + min);
				}
			}
		} else{
			for (int i = 0; i < rows; i++) {
				for (int j = 0; j < cols; j++) {
					if (random.nextDouble() > sparsity)
						continue;
					matrix[i][j] = max;
				}
			}
		}

		return matrix;
	}

	/**
	 * <p>
	 * Generates a test matrix with the specified parameters as a two
	 * dimensional array. The matrix will not contain any zero values.
	 * </p>
	 * <p>
	 * Set seed to -1 to use the current time as seed.
	 * </p>
	 *
	 * @param rows
	 *            number of rows
	 * @param cols
	 *            number of columns
	 * @param min
	 *            minimum value
	 * @param max
	 *            maximum value
	 * @param seed
	 *            seed
	 * @return random matrix
	 */
	public static double[][] generateNonZeroTestMatrix(int rows, int cols, double min, double max, long seed) {
		double[][] matrix = new double[rows][cols];
		Random random = (seed == -1) ? TestUtils.random : new Random(seed);
		for (int i = 0; i < rows; i++) {
			for (int j = 0; j < cols; j++) {
				double randValue;
				do {
					randValue = random.nextDouble();
				} while (randValue == 0);
				matrix[i][j] = (randValue * (max - min) + min);
			}
		}

		return matrix;
	}

	/**
	 * <p>
	 * Generates a test matrix with the specified parameters and writes it to a
	 * file using the text format.
	 * </p>
	 * <p>
	 * Set seed to -1 to use the current time as seed.
	 * </p>
	 *
	 * @param file
	 *            output file
	 * @param rows
	 *            number of rows
	 * @param cols
	 *            number of columns
	 * @param min
	 *            minimum value
	 * @param max
	 *            maximum value
	 * @param sparsity
	 *            sparsity
	 * @param seed
	 *            seed
	 */
	public static void generateTestMatrixToFile(String file, int rows, int cols, double min, double max,
			double sparsity, long seed) {
		try {
			Path inFile = new Path(file);
			FileSystem fs = IOUtilFunctions.getFileSystem(inFile, conf);
			DataOutputStream out = fs.create(inFile);
			try( PrintWriter pw = new PrintWriter(out) ) {
				Random random = (seed == -1) ? TestUtils.random : new Random(seed);

				for (int i = 1; i <= rows; i++) {
					for (int j = 1; j <= cols; j++) {
						if (random.nextDouble() > sparsity)
							continue;
						double value = (random.nextDouble() * (max - min) + min);
						if (value != 0)
							pw.println(i + " " + j + " " + value);
					}
				}
			}
		} catch (IOException e) {
			fail("unable to write test matrix: " + e.getMessage());
		}
	}

	public static FrameBlock generateRandomFrameBlock(int rows, ValueType[] schema, Random random){
		FrameBlock frameBlock = new FrameBlock(schema);
		frameBlock.ensureAllocatedColumns(rows);
		for(int row = 0; row < rows; row++)
			for(int col = 0; col < schema.length; col++)
				frameBlock.set(row, col, generateRandomValueFromValueType(schema[col], random));
		return frameBlock;
	}

	public static FrameBlock generateRandomFrameBlock(int rows, ValueType[] schema, long seed){
		Random random = (seed == -1) ? TestUtils.random : new Random(seed);
		return generateRandomFrameBlock(rows, schema, random);
	}

	public static FrameBlock generateRandomFrameBlock(int rows, ValueType[] schema, long seed, double nullChance){
		Random random = (seed == -1) ? TestUtils.random : new Random(seed);

		FrameBlock frameBlock = new FrameBlock();
		for(int col = 0; col < schema.length; col++){
			Array<?> column = generateColumn(rows, schema[col], random, nullChance);
			frameBlock.appendColumn(column);
		}
		return frameBlock;
	}

	@SuppressWarnings("unchecked")
	private static Array<?> generateColumn(int rows, ValueType type, Random rand, double nullChance) {
		if(nullChance == 0) {
			switch(type) {
				case BOOLEAN:
					Array<Boolean> a = (Array<Boolean>) ArrayFactory.allocate(type, rows);
					for(int r = 0; r < rows; r++)
						a.set(r, rand.nextBoolean());
					return a;
				case CHARACTER:
					Array<Character> c = (Array<Character>) ArrayFactory.allocate(type, rows);
					for(int r = 0; r < rows; r++)
						c.set(r, rand.nextInt(Character.MAX_VALUE));
					return c;
				case FP32:
					Array<Float> f = (Array<Float>) ArrayFactory.allocate(type, rows);
					for(int r = 0; r < rows; r++)
						f.set(r, rand.nextFloat());
					return f;
				case FP64:
					Array<Double> d = (Array<Double>) ArrayFactory.allocate(type, rows);
					for(int r = 0; r < rows; r++)
						d.set(r, rand.nextDouble());
					return d;
				case INT32:
				case UINT4:
				case UINT8:
					Array<Integer> i = (Array<Integer>) ArrayFactory.allocate(type, rows);
					int limit = type == ValueType.UINT4 ? 16 : type == ValueType.UINT8 ? 256 : Integer.MAX_VALUE;
					for(int r = 0; r < rows; r++)
						i.set(r, rand.nextInt(limit));
					return i;
				case INT64:
					Array<Long> l = (Array<Long>) ArrayFactory.allocate(type, rows);
					for(int r = 0; r < rows; r++)
						l.set(r, rand.nextLong());
					return l;
				case STRING:
					StringArray s = (StringArray) ArrayFactory.allocate(type, rows);
					for(int r = 0; r < rows; r++) {
						String st = random.ints('a', 'z' + 1).limit(10)
							.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append).toString();
						s.set(r, st);
					}
					return s;
				case UNKNOWN:
				default:
					throw new NotImplementedException();
			}

		}
		else {
			switch(type) {
				case BOOLEAN:
					OptionalArray<Boolean> a = (OptionalArray<Boolean>) ArrayFactory.allocateOptional(type, rows);
					for(int r = 0; r < rows; r++) {
						if(rand.nextDouble() < nullChance)
							a.set(r, (Boolean) null);
						else
							a.set(r, rand.nextBoolean());
					}
					return a;
				case CHARACTER:
					OptionalArray<Character> c = (OptionalArray<Character>) ArrayFactory.allocateOptional(type, rows);
					for(int r = 0; r < rows; r++) {
						if(rand.nextDouble() < nullChance)
							c.set(r, (Character) null);
						else
							c.set(r, rand.nextInt(Character.MAX_VALUE));
					}
					return c;
				case FP32:
					OptionalArray<Float> f = (OptionalArray<Float>) ArrayFactory.allocateOptional(type, rows);
					for(int r = 0; r < rows; r++) {
						if(rand.nextDouble() < nullChance)
							f.set(r, (Float) null);
						else
							f.set(r, rand.nextFloat());
					}
					return f;
				case FP64:
					OptionalArray<Double> d = (OptionalArray<Double>) ArrayFactory.allocateOptional(type, rows);
					for(int r = 0; r < rows; r++) {
						if(rand.nextDouble() < nullChance)
							d.set(r, (Double) null);
						else
							d.set(r, rand.nextDouble());
					}
					return d;
				case INT32:
				case UINT4:
				case UINT8:
					Array<Integer> i = (Array<Integer>) ArrayFactory.allocateOptional(type, rows);
					int limit = type == ValueType.UINT4 ? 16 : type == ValueType.UINT8 ? 256 : Integer.MAX_VALUE;
					for(int r = 0; r < rows; r++) {
						if(rand.nextDouble() < nullChance)
							i.set(r, (Integer) null);
						else
							i.set(r, rand.nextInt(limit));
					}
					return i;
				case INT64:
					OptionalArray<Long> l = (OptionalArray<Long>) ArrayFactory.allocateOptional(type, rows);
					for(int r = 0; r < rows; r++) {
						if(rand.nextDouble() < nullChance)
							l.set(r, (Long) null);
						else
							l.set(r, rand.nextLong());
					}
					return l;
				case STRING:
					StringArray s = (StringArray) ArrayFactory.allocateOptional(type, rows);
					for(int r = 0; r < rows; r++) {
						if(rand.nextDouble() < nullChance)
							s.set(r, (String) null);
						else {
							String st = random.ints('a', 'z' + 1).limit(10)
								.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append).toString();
							s.set(r, st);
						}
					}
					return s;
				case UNKNOWN:
				default:
					throw new NotImplementedException();
			}
		}
	}

	public static FrameBlock generateRandomFrameBlock(int rows, int cols, long seed){
		ValueType[] schema = generateRandomSchema(cols, seed);
		return generateRandomFrameBlock(rows, schema ,seed);
	}

	public static FrameBlock generateRandomFrameBlockWithSchemaOfStrings(int rows, int cols, long seed){
		ValueType[] schema = generateRandomSchema(cols, seed);
		FrameBlock f =  generateRandomFrameBlock(rows, schema, seed);
		ValueType[] schemaString = UtilFunctions.nCopies(cols, ValueType.STRING);
		return FrameLibApplySchema.applySchema(f, schemaString);
	}

	/**
	 * <p>
	 *     Generates a random Schema with given params. With no type Unknown
	 * </p>
	 *
	 * @param size
	 * 				size of the schema
	 * @param random
	 * 				random Object
	 */
	public static ValueType[] generateRandomSchema(int size, Random random){
		final List<ValueType> valueTypes = new ArrayList<>();
		for(ValueType v : ValueType.values())
			if(v != ValueType.UNKNOWN)
				valueTypes.add(v);
		
		ValueType[] newSchema = new ValueType[size];
		for(int i = 0; i < size; i++)
			newSchema[i] = valueTypes.get(random.nextInt(valueTypes.size()));
		
		return newSchema;
	}

	public static ValueType[] generateRandomSchema(int size, long seed){
		Random random = (seed == -1) ? TestUtils.random : new Random(seed);
		return generateRandomSchema(size, random);
	}

	/**
	 * <p>
	 *     Generates a random SchemaMap with given params. Maximum name length per name is 10
	 * </p>
	 *
	 * @param size
	 * 				size of the schemaMap
	 * @param random
	 * 				random Object
	 */
	public static Map<String, Integer> generateRandomSchemaMap(int size, Random random){
		Map<String, Integer> schemaMap = new HashMap<>();
		List<String> generatedPaths = new ArrayList<>();
		for(int k = 0; k < (size/2) + 1; k++){
			generatedPaths.add(generateRandomJSONPath(0, random));
		}
		while(generatedPaths.size() < size){
			generateRandomJSONPaths(generatedPaths, random, size - generatedPaths.size());
		}
		for(int i = 0; i < generatedPaths.size(); i++){
			schemaMap.put(generatedPaths.get(i), i);
		}
		return schemaMap;
	}

	public static Map<String, Integer> generateRandomSchemaMap(int size, long seed){
		Random random = (seed == -1) ? TestUtils.random : new Random(seed);
		return generateRandomSchemaMap(size, random);
	}



	/**
	 * <p>
	 *     Generates a random JSON paths from a existing set of paths, Function is probabilistic so may have to be
	 *     repeated to get the exact number of paths in size
	 * </p>
	 *
	 * @param size
	 * 				extrapolates the given paths to a MAXIMUM of size paths
	 * @param random
	 * 				random object
	 */
	public static List<String> generateRandomJSONPaths(List<String> paths, Random random, int size){
		List<String> newPaths = new LinkedList<>();
		if(paths.size() == 0 || size <= 0){
			return newPaths;
		}
		int pathslen = paths.size();
		for(int i = 0; i < pathslen; i++){
			String base = paths.get(i);
			int subEntries = random.nextInt(5) + 2;
			for(int c = 0; c < subEntries && size > 0; c++){
				String sub = base + generateRandomJSONPath(0, random);
				paths.add(sub);
				if(c == 0){
					paths.remove(base);
					pathslen--;
					size++;
				}
				size--;
				if(random.nextBoolean()){
					newPaths.add(sub);
					paths.remove(sub);
					size++;
					pathslen--;
				}
			}

		}
		List<String> ret = generateRandomJSONPaths(newPaths, random, size - newPaths.size());
		paths.addAll(ret);
		return paths;
	}

	public static List<String> generateRandomJSONPaths(List<String> paths, long seed, int size){
		Random random = (seed == -1) ? TestUtils.random : new Random(seed);
		return generateRandomJSONPaths(paths, random, size);
	}

	/**
	 * <p>
	 *     Generates a random JSON path
	 * </p>
	 *
	 * @param len
	 * 				length of the new path = len + 1
	 * @param random
	 * 				random Object
	 */
	public static String generateRandomJSONPath(int len, Random random){
		String current = "/" + random.ints('a', 'z' + 1).limit(10).collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append).toString();
		if(len == 0){
			return current;
		}
		return current + generateRandomJSONPath(len - 1, random);
	}

	public static String generateRandomJSONPath(int len, long seed){
		Random random = (seed == -1) ? TestUtils.random : new Random(seed);
		return generateRandomJSONPath(len, random);
	}
	/**
	 * <p>
	 *     Generates a random value for a given Value Type
	 * </p>
	 *
	 * @param valueType
	 * 				the ValueType of which to generate the value
	 * @param random
	 * 				random Object
	 */
	public static Object generateRandomValueFromValueType(ValueType valueType, Random random){
		switch (valueType){
			case UINT4:   return random.nextInt(16);
			case UINT8:   return random.nextInt(256);
			case FP32:	  return random.nextFloat();
			case FP64:    return random.nextDouble();
			case INT32:   return random.nextInt();
			case INT64:   return random.nextLong() % ((long)Integer.MAX_VALUE * 10L); 
			case BOOLEAN: return random.nextBoolean();
			case HASH32:  return Integer.toHexString(random.nextInt());
			case HASH64:  return Long.toHexString(random.nextLong());
			case STRING:
				return random.ints('a', 'z' + 1)
						.limit(10)
						.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
						.toString();
			default:
				return null;
		}
	}

	public static Object generateRandomValueFromValueType(ValueType valueType, long seed){
		Random random = (seed == -1) ? TestUtils.random : new Random(seed);
		return generateRandomValueFromValueType(valueType, random);
	}

	/**
	 * Counts the number of NNZ values in a matrix
	 *
	 * @param matrix
	 * @return
	 */
	public static int countNNZ(double[][] matrix) {
		int n = 0;
		for (int i = 0; i < matrix.length; i++) {
			for (int j = 0; j < matrix[0].length; j++) {
				if (matrix[i][j] != 0)
					n++;
			}
		}
		return n;
	}

	public static void writeCSVTestMatrix(String file, double[][] matrix)
	{
		try
		{
			//create outputstream to HDFS / FS and writer
			Path path = new Path(file);
			FileSystem fs = IOUtilFunctions.getFileSystem(path, conf);
			DataOutputStream out = fs.create(path, true);
			try( BufferedWriter pw = new BufferedWriter(new OutputStreamWriter(out))) {
				//writer actual matrix
				StringBuilder sb = new StringBuilder();
				for (int i = 0; i < matrix.length; i++) {
					sb.setLength(0);
					if ( matrix[i][0] != 0 )
						sb.append(matrix[i][0]);
					for (int j = 1; j < matrix[i].length; j++) {
						sb.append(",");
						if ( matrix[i][j] == 0 )
							continue;
						sb.append(matrix[i][j]);
					}
					sb.append('\n');
					pw.append(sb.toString());
				}
			}
		}
		catch (IOException e)
		{
			fail("unable to write (csv) test matrix (" + file + "): " + e.getMessage());
		}
	}

	/**
	 * <p>
	 * Writes a matrix to a file using the text format.
	 * </p>
	 *
	 * @param file
	 *            file name
	 * @param matrix
	 *            matrix
	 * @param isR
	 *            when true, writes a R matrix to disk
	 *
	 */
	public static void writeTestMatrix(String file, double[][] matrix, boolean isR)
	{
		try
		{
			//create outputstream to HDFS / FS and writer
			DataOutputStream out = null;
			if (!isR) {
				Path path = new Path(file);
				FileSystem fs = IOUtilFunctions.getFileSystem(path, conf);
				out = fs.create(path, true);
			}
			else {
				out = new DataOutputStream(new FileOutputStream(file));
			}

			try( BufferedWriter pw = new BufferedWriter(new OutputStreamWriter(out))) {

				//write header
				if( isR ) {
					/** add R header */
					pw.append("%%MatrixMarket matrix coordinate real general\n");
					pw.append("" + matrix.length + " " + matrix[0].length + " " + matrix.length*matrix[0].length+"\n");
				}

				//writer actual matrix
				StringBuilder sb = new StringBuilder();
				boolean emptyOutput = true;
				for (int i = 0; i < matrix.length; i++) {
					for (int j = 0; j < matrix[i].length; j++) {
						if ( matrix[i][j] == 0 )
							continue;
						sb.append(i + 1);
						sb.append(' ');
						sb.append(j + 1);
						sb.append(' ');
						sb.append(matrix[i][j]);
						sb.append('\n');
						pw.append(sb.toString());
						sb.setLength(0);
						emptyOutput = false;
					}
				}

				//writer dummy entry if empty
				if( emptyOutput )
					pw.append("1 1 " + matrix[0][0]);
			}
		}
		catch (IOException e)
		{
			fail("unable to write test matrix (" + file + "): " + e.getMessage());
		}
	}


	protected static void writeCSV(String completePath, double[][] matrix, boolean header) throws IOException{
		Path path = new Path(completePath);
		FileSystem fs = IOUtilFunctions.getFileSystem(path, conf);
		DataOutputStream out = fs.create(path, true);
		try(BufferedWriter pw = new BufferedWriter(new OutputStreamWriter(out))) {

			if(header) {
				pw.append("d0");
				for(int i = 1; i < matrix[0].length; i++) {
					pw.append(",d" + i);
				}
				pw.append("\n");
			}
			for(int j = 0; j < matrix.length; j++) {
				pw.append("" + matrix[j][0]);
				for(int i = 1; i < matrix[j].length; i++) {
					pw.append("," + matrix[j][i]);
				}
				pw.append("\n");
			}
		}
	}


	/**
	 * <p>
	 * Writes a matrix to a file using the text format.
	 * </p>
	 *
	 * @param file
	 *            file name
	 * @param matrix
	 *            matrix
	 */
	public static void writeTestMatrix(String file, double[][] matrix) {
		writeTestMatrix(file, matrix, false);
	}


	/**
	 * <p>
	 * Writes a frame to a file using the text format.
	 * </p>
	 *
	 * @param file
	 *            file name
	 * @param data
	 *            frame data
	 * @param isR
	 * @throws IOException
	 */
	public static void writeTestFrame(String file, double[][] data, ValueType[] schema, FileFormat fmt, boolean isR) throws IOException {
		FrameWriter writer = FrameWriterFactory.createFrameWriter(fmt);
		FrameBlock frame = new FrameBlock(schema);
		initFrameData(frame, data, schema, data.length);
		writer.writeFrameToHDFS(frame, file, data.length, schema.length);
	}

	public static void writeTestFrame(String file, FrameBlock data, ValueType[] schema, FileFormat fmt, boolean isR) throws IOException {
		FrameWriter writer = FrameWriterFactory.createFrameWriter(fmt);
		writer.writeFrameToHDFS(data, file, data.getNumRows(), schema.length);
	}

	/**
	 * <p>
	 * Writes a frame to a file using the text format.
	 * </p>
	 *
	 * @param file
	 *            file name
	 * @param data
	 *            frame data
	 * @throws IOException
	 */
	public static void writeTestFrame(String file, double[][] data, ValueType[] schema, FileFormat fmt) throws IOException {
		writeTestFrame(file, data, schema, fmt, false);
	}

	public static void writeTestFrame(String file, FrameBlock data, ValueType[] schema, FileFormat fmt) throws IOException {
		writeTestFrame(file, data, schema, fmt, false);
	}

	public static void initFrameData(FrameBlock frame, double[][] data, ValueType[] lschema, int rows) {
		Object[] row1 = new Object[lschema.length];
		for( int i=0; i<rows; i++ ) {
			for( int j=0; j<lschema.length; j++ ) {
				data[i][j] = UtilFunctions.objectToDouble(lschema[j],
						row1[j] = UtilFunctions.doubleToObject(lschema[j], data[i][j]));
				if(row1[j] != null && lschema[j] == ValueType.STRING)
					row1[j] = "Str" + row1[j];
			}
			frame.appendRow(row1);
		}
	}


	/**
	 * Write scalar to file
	 * 
	 * @param file  File to write to
	 * @param value Value to write
	 */
	public static void writeTestScalar(String file, double value) {
		try {
			DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
			try(PrintWriter pw = new PrintWriter(out)) {
				pw.println(value);
			}
		}
		catch(IOException e) {
			fail("unable to write test scalar (" + file + "): " + e.getMessage());
		}
	}

	/**
	 * Write scalar to file
	 * 
	 * @param file  File to write to
	 * @param value Value to write
	 */
	public static void writeTestScalar(String file, long value) {
		try {
			DataOutputStream out = new DataOutputStream(new FileOutputStream(file));
			try(PrintWriter pw = new PrintWriter(out)) {
				pw.println(value);
			}
		}
		catch(IOException e) {
			fail("unable to write test scalar (" + file + "): " + e.getMessage());
		}
	}

	/**
	 * Writes a matrix to a file using the binary cells format.
	 * 
	 * @param file   file name
	 * @param matrix matrix
	 */
	public static void writeBinaryTestMatrixCells(String file, double[][] matrix) {
		try {
			final Writer writer = IOUtilFunctions.getSeqWriterCell(new Path(file), conf, 1);
			try {
				MatrixIndexes index = new MatrixIndexes();
				MatrixCell value = new MatrixCell();
				for (int i = 0; i < matrix.length; i++) {
					for (int j = 0; j < matrix[i].length; j++) {
						if (matrix[i][j] != 0) {
							index.setIndexes((i + 1), (j + 1));
							value.setValue(matrix[i][j]);
							writer.append(index, value);
						}
					}
				}
			}
			finally {
				IOUtilFunctions.closeSilently(writer);
			}
		} catch (IOException e) {
			e.printStackTrace();
			fail("unable to write test matrix: " + e.getMessage());
		}
	}

	/**
	 *  Writes a matrix to a file using the binary blocks format. 
	 *
	 * @param file         file name
	 * @param matrix       matrix
	 * @param rowsInBlock  rows in block
	 * @param colsInBlock  columns in block
	 * @param sparseFormat sparse format
	 */
	public static void writeBinaryTestMatrixBlocks(String file, double[][] matrix, int rowsInBlock, int colsInBlock,
			boolean sparseFormat) {

		try {
			final Writer writer = IOUtilFunctions.getSeqWriter(new Path(file), conf, 1);
			try{
				MatrixIndexes index = new MatrixIndexes();
				MatrixBlock value = new MatrixBlock();
				for (int i = 0; i < matrix.length; i += rowsInBlock) {
					int rows = Math.min(rowsInBlock, (matrix.length - i));
					for (int j = 0; j < matrix[i].length; j += colsInBlock) {
						int cols = Math.min(colsInBlock, (matrix[i].length - j));
						index.setIndexes(((i / rowsInBlock) + 1), ((j / colsInBlock) + 1));
						value.reset(rows, cols, sparseFormat);
						for (int k = 0; k < rows; k++) {
							for (int l = 0; l < cols; l++) {
								value.set(k, l, matrix[i + k][j + l]);
							}
						}
						writer.append(index, value);
					}
				}
			}
			finally {
				IOUtilFunctions.closeSilently(writer);
			}
		}
		catch (IOException e) {
			e.printStackTrace();
			fail("unable to write test matrix: " + e.getMessage());
		}
	}

	/**
	 * <p>
	 * Prints out a DML script.
	 * </p>
	 *
	 * @param dmlScriptFile
	 *            filename of DML script
	 */
	public static void printDMLScript(String dmlScriptFile) {
		System.out.println("Running script: " + dmlScriptFile + "\n");
		System.out.println("******************* DML script *******************");
		try(BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(dmlScriptFile)))) {
			String content;
			while ((content = in.readLine()) != null) {
				System.out.println(content);
			}
		}
		catch (IOException e) {
			e.printStackTrace();
			fail("unable to print dml script: " + e.getMessage());
		}
		System.out.println("**************************************************\n\n");
	}

	/**
	 * <p>
	 * Prints out a PYDML script.
	 * </p>
	 *
	 * @param pydmlScriptFile
	 *            filename of PYDML script
	 */
	public static void printPYDMLScript(String pydmlScriptFile) {
		System.out.println("Running script: " + pydmlScriptFile + "\n");
		System.out.println("******************* PYDML script *******************");
		try(BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(pydmlScriptFile))) ) {
			String content;
			while ((content = in.readLine()) != null) {
				System.out.println(content);
			}
		}
		catch (IOException e) {
			e.printStackTrace();
			fail("unable to print pydml script: " + e.getMessage());
		}
		System.out.println("**************************************************\n\n");
	}

	/**
	 * <p>
	 * Prints out an R script.
	 * </p>
	 *
	 * @param dmlScriptFile
	 *            filename of RL script
	 */
	public static void printRScript(String dmlScriptFile) {
		System.out.println("Running script: " + dmlScriptFile + "\n");
		System.out.println("******************* R script *******************");
		try( BufferedReader in = new BufferedReader(new InputStreamReader(new FileInputStream(dmlScriptFile)))) {
			String content;
			while ((content = in.readLine()) != null) {
				System.out.println(content);
			}
		}
		catch (IOException e) {
			e.printStackTrace();
			fail("unable to print R script: " + e.getMessage());
		}
		System.out.println("**************************************************\n\n");
	}

	/**
	 * <p>
	 * Renames a temporary DML script file back to it's original name.
	 * </p>
	 *
	 * @param dmlScriptFile
	 *            temporary script file
	 */
	public static void renameTempDMLScript(String dmlScriptFile) {
		File oldPath = new File(dmlScriptFile + "t");
		File newPath = new File(dmlScriptFile);
		oldPath.renameTo(newPath);
	}

	/**
	 * <p>
	 * Removes all temporary files and directories in the current working
	 * directory.
	 * </p>
	 */
	public static void removeTemporaryFiles() {
		try {
			Path workingDir = new Path(".");
			FileSystem fs = IOUtilFunctions.getFileSystem(workingDir, conf);
			FileStatus[] files = fs.listStatus(workingDir);
			for (FileStatus file : files) {
				String fileName = file.getPath().toString().substring(
						file.getPath().getParent().toString().length() + 1);
				if (fileName.contains("temp"))
					fs.delete(file.getPath(), false);
			}
		} catch (IOException e) {
			e.printStackTrace();
			fail("unable to remove temporary files: " + e.getMessage());
		}
	}

	/**
	 * <p>
	 * Checks if any temporary files or directories exist in the current working
	 * directory.
	 * </p>
	 *
	 * @return true if temporary files or directories are available
	 */
	public static boolean checkForTemporaryFiles() {
		try {
			Path workingDir = new Path(".");
			FileSystem fs = IOUtilFunctions.getFileSystem(workingDir, conf);
			FileStatus[] files = fs.listStatus(workingDir);
			for (FileStatus file : files) {
				String fileName = file.getPath().toString().substring(
						file.getPath().getParent().toString().length() + 1);
				if (fileName.contains("temp"))
					return true;
			}
		} catch (IOException e) {
			e.printStackTrace();
			fail("unable to remove temporary files: " + e.getMessage());
		}

		return false;
	}

	/**
	 * <p>
	 * Returns the path to a file in a directory if it is the only file in the
	 * directory.
	 * </p>
	 *
	 * @param directory
	 *            directory containing the file
	 * @return path of the file
	 */
	public static Path getFileInDirectory(String directory) {
		try {
			Path path = new Path(directory);
			FileSystem fs = IOUtilFunctions.getFileSystem(path, conf);
			FileStatus[] files = fs.listStatus(path);
			if (files.length != 1)
				throw new IOException("requires exactly one file in directory " + directory);

			return files[0].getPath();
		} catch (IOException e) {
			e.printStackTrace();
			fail("unable to open file in " + directory);
		}

		return null;
	}

	/**
	 * <p>
	 * Creates an empty file.
	 * </p>
	 *
	 * @param filename
	 *            filename
	 */
	public static void createFile(String filename) throws IOException {
		Path path = new Path(filename);
		FileSystem fs = IOUtilFunctions.getFileSystem(path, conf);
		fs.create(path);
	}

	/**
	 * <p>
	 * Performs transpose onto a matrix and returns the result.
	 * </p>
	 *
	 * @param a
	 *            matrix
	 * @return transposed matrix
	 */
	public static double[][] performTranspose(double[][] a) {
		int rows = a[0].length;
		int cols = a.length;
		double[][] result = new double[rows][cols];

		for (int i = 0; i < rows; i++) {
			for (int j = 0; j < cols; j++) {
				result[i][j] = a[j][i];
			}
		}

		return result;
	}

	/**
	 * <p>
	 * Performs matrix multiplication onto two matrices and returns the result.
	 * </p>
	 *
	 * @param a
	 *            left matrix
	 * @param b
	 *            right matrix
	 * @return computed result
	 */
	public static double[][] performMatrixMultiplication(double[][] a, double[][] b) {
		int m = a.length;
		int n = a[0].length;
		int l = b[0].length;
		double[][] result = new double[m][l];

		for (int i = 0; i < m; i++) {
			for (int j = 0; j < l; j++) {
				double value = 0;
				for (int k = 0; k < n; k++) {
					value += (a[i][k] * b[k][j]);
				}
				result[i][j] = value;
			}
		}

		return result;
	}

	/**
	 * <p>
	 * Returns a random integer value.
	 * </p>
	 *
	 * @return random integer value
	 */
	public static int getRandomInt() {
		Random random = new Random(System.currentTimeMillis());
		int randomValue = random.nextInt();
		return randomValue;
	}

	/**
	 * <p>
	 * Returns a positive random integer value.
	 * </p>
	 *
	 * @return positive random integer value
	 */
	public static int getPositiveRandomInt() {
		int randomValue = TestUtils.getRandomInt();
		if (randomValue < 0)
			randomValue = -randomValue;
		return randomValue;
	}

	/**
	 * <p>
	 * Returns a negative random integer value.
	 * </p>
	 *
	 * @return negative random integer value
	 */
	public static int getNegativeRandomInt() {
		int randomValue = TestUtils.getRandomInt();
		if (randomValue > 0)
			randomValue = -randomValue;
		return randomValue;
	}

	/**
	 * <p>
	 * Returns a random double value.
	 * </p>
	 *
	 * @return random double value
	 */
	public static double getRandomDouble() {
		Random random = new Random(System.currentTimeMillis());
		double randomValue = random.nextInt() * random.nextDouble();
		return randomValue;
	}

	/**
	 * <p>
	 * Returns a positive random double value.
	 * </p>
	 *
	 * @return positive random double value
	 */
	public static double getPositiveRandomDouble() {
		double randomValue = TestUtils.getRandomDouble();
		if (randomValue < 0)
			randomValue = -randomValue;
		return randomValue;
	}

	/**
	 * <p>
	 * Returns a negative random double value.
	 * </p>
	 *
	 * @return negative random double value
	 */
	public static double getNegativeRandomDouble() {
		double randomValue = TestUtils.getRandomDouble();
		if (randomValue > 0)
			randomValue = -randomValue;
		return randomValue;
	}

	/**
	 * <p>
	 * Returns the string representation of a double value which can be used in
	 * a DML script.
	 * </p>
	 *
	 * @param value
	 *            double value
	 * @return string representation
	 */
	public static String getStringRepresentationForDouble(double value) {
		NumberFormat nf = NumberFormat.getInstance(new Locale("EN"));
		nf.setGroupingUsed(false);
		nf.setMinimumFractionDigits(1);
		nf.setMaximumFractionDigits(20);
		return nf.format(value);
	}

	public static void replaceRandom( double[][] A, int rows, int cols, double replacement, int len ) {
		Random rand = new Random();
		for( int i=0; i<len; i++ )
			A[rand.nextInt(rows-1)][rand.nextInt(cols-1)] = replacement;
	}

	/**
	 * Clears internal assertion information storage
	 */
	public static void clearAssertionInformation() {
		_AssertInfos.clear();
		_AssertOccured = false;
	}

	/**
	 * <p>
	 * Generates a matrix containing easy to debug values in its cells.
	 * </p>
	 *
	 * @param rows
	 * @param cols
	 * @param bContainsZeros
	 *            If true, the matrix contains zeros. If false, the matrix
	 *            contains only positive values.
	 * @return
	 */
	public static double[][] createNonRandomMatrixValues(int rows, int cols, boolean bContainsZeros) {
		double[][] matrix = new double[rows][cols];
		for (int i = 0; i < rows; i++) {
			for (int j = 0; j < cols; j++) {
				if (!bContainsZeros)
					matrix[i][j] = (i + 1) * 10 + (j + 1);
				else
					matrix[i][j] = (i) * 10 + (j);
			}
		}
		return matrix;
	}

	public static double[][] round(double[][] data) {
		for(int i=0; i<data.length; i++)
			for(int j=0; j<data[i].length; j++)
				data[i][j]=Math.round(data[i][j]);
		return data;
	}

	public static double[][] round(double[][] data, int col) {
		for(int i=0; i<data.length; i++)
			data[i][col]=Math.round(data[i][col]);
		return data;
	}

	public static MatrixBlock round(MatrixBlock data) {
		return data.unaryOperations(new UnaryOperator(Builtin.getBuiltinFnObject(BuiltinCode.ROUND), 2, true), null);
	}

	public static MatrixBlock ceil(MatrixBlock data) {
		return data.unaryOperations(new UnaryOperator(Builtin.getBuiltinFnObject(BuiltinCode.CEIL), 2, true), null);
	}

	public static MatrixBlock floor(MatrixBlock data) {
		return data.unaryOperations(new UnaryOperator(Builtin.getBuiltinFnObject(BuiltinCode.FLOOR), 2, true), null);
	}

	public static double[][] floor(double[][] data) {
		for(int i=0; i<data.length; i++)
			for(int j=0; j<data[i].length; j++)
				data[i][j]=Math.floor(data[i][j]);
		return data;
	}

	public static double[][] ceil(double[][] data) {
		for(int i=0; i<data.length; i++)
			for(int j=0; j<data[i].length; j++)
				data[i][j]=Math.ceil(data[i][j]);
		return data;
	}

	public static double[] ceil(double[] data) {
		for(int i = 0; i < data.length; i++)
			data[i] = Math.ceil(data[i]);
		return data;
	}

	public static double[][] floor(double[][] data, int col) {
		for(int i=0; i<data.length; i++)
			data[i][col]=Math.floor(data[i][col]);
		return data;
	}

	public static double sum(double[][] data, int rows, int cols) {
		double sum = 0;
		for (int i = 0; i< rows; i++){
			for (int j = 0; j < cols; j++){
				sum += data[i][j];
			}
		}
		return sum;
	}

	public static long computeNNZ(double[][] data) {
		long nnz = 0;
		for(int i=0; i<data.length; i++)
			nnz += UtilFunctions.computeNnz(data[i], 0, data[i].length);
		return nnz;
	}

	public static double[][] seq(int from, int to, int incr) {
		int len = (int)UtilFunctions.getSeqLength(from, to, incr);
		double[][] ret = new double[len][1];
		for(int i=0, val=from; val<=to; i++, val+=incr)
			ret[i][0] = val;
		return ret;
	}

	public static void shutdownThreads(Thread... ts) {
		for( Thread t : ts )
			shutdownThread(t);
	}

	public static void shutdownThreads(Process... ts) {
		for( Process t : ts ){
			if (t != null)
				shutdownThread(t);
		}
	}

	public static void shutdownThread(Thread t) {
		// kill the worker
		if( t != null ) {
			t.interrupt();
			try {
				t.join();
			}
			catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	public static void shutdownThread(Process t) {
		// kill the worker
		if( t != null ) {
			Process d = t.destroyForcibly();
			try {
				d.waitFor();
			}
			catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	public static String federatedAddress(int port, String input) {
		return federatedAddress("localhost", port, input);
	}

	public static String federatedAddress(String host, int port, String input) {
		return host + ':' + port + '/' + input;
	}

	public static String federatedAddressNoInput(String host, int port) {
		return host + ':' + port;
	}

	public static double gaussian_probability (double point)
	//  "Handbook of Mathematical Functions", ed. by M. Abramowitz and I.A. Stegun,
	//  U.S. Nat-l Bureau of Standards, 10th print (Dec 1972), Sec. 7.1.26, p. 299
	{
		double t_gp = 1.0 / (1.0 + Math.abs (point) * 0.231641888);  // 0.231641888 = 0.3275911 / sqrt (2.0)
		double erf_gp = 1.0 - t_gp * ( 0.254829592
				+ t_gp * (-0.284496736
				+ t_gp * ( 1.421413741
				+ t_gp * (-1.453152027
				+ t_gp *   1.061405429)))) * Math.exp (- point * point / 2.0);
		erf_gp = erf_gp * (point > 0 ? 1.0 : -1.0);
		return (0.5 + 0.5 * erf_gp);
	}

	public static double logFactorial (double x)
	//  From paper: C. Lanczos "A Precision Approximation of the Gamma Function",
	//  Journal of the SIAM: Numerical Analysis, Series B, Vol. 1, 1964, pp. 86-96
	{
		final double[] cf = {1.000000000178, 76.180091729406, -86.505320327112,
				24.014098222230, -1.231739516140, 0.001208580030, -0.000005363820};
		double a_5 = cf[0] + cf[1] / (x + 1) + cf[2] / (x + 2) + cf[3] / (x + 3)
				+ cf[4] / (x + 4) + cf[5] / (x + 5) + cf[6] / (x + 6);
		return Math.log(a_5) + (x + 0.5) * Math.log(x + 5.5) - (x + 5.5) + 0.91893853320467; // log(sqrt(2 * PI))
	}

	public static long nextPoisson (Random r, double mu)
	// Prob[k] = mu^k * exp(-mu) / k!
	// The main part is from W. H"ormann "The Transformed Rejection Method
	// for Generating Poisson Random Variables"
	{
		if (mu <= 0.0)
			return 0;
		if (mu >= 100000.0)
			return Math.round (mu + Math.sqrt (mu) * r.nextGaussian ());
		if (mu >= 10.0)
		{
			long output = 0;
			double c = mu + 0.445;
			double b = 0.931 + 2.53 * Math.sqrt (mu);
			double a = -0.059 + 0.02483 * b;
			double one_by_alpha = 1.1239 + 1.1328 / (b - 3.4);
			double u_r = 0.43;
			double v_r = 0.9277 - 3.6224 / (b - 2);
			while (true)
			{
				double U;
				double V = r.nextDouble ();
				if (V <= 2 * u_r * v_r)
				{
					U = V / v_r - u_r;
					output = (long) Math.floor ((2 * a / (0.5 - Math.abs (U)) + b) * U + c);
					break;
				}
				if (V >= v_r)
				{
					U = r.nextDouble () - 0.5;
				}
				else
				{
					U = V / v_r - (u_r + 0.5);
					U = Math.signum (U) * 0.5 - U;
					V = v_r * r.nextDouble ();
				}
				double us = 0.5 - Math.abs (U);
				if (0.487 < Math.abs (U) && us < V)
					continue;
				long k = (long) Math.floor ((2 * a / us + b) * U + c);
				double V_to_compare = (V * one_by_alpha) / (a / us / us + b);
				if (0 <= k &&  Math.log (V_to_compare) <= - mu + k * Math.log (mu) - TestUtils.logFactorial (k))
				{
					output = k;
					break;
				}
			}
			return output;
		}
		long count = 0;
		double res_mu = mu;
		while (res_mu > 0.0)
		{
			count ++;
			res_mu += Math.log (r.nextDouble ());
		}
		return count - 1;
	}

	public static double nextGamma (Random r, double alpha)
	// PDF(x) = x^(alpha-1) * exp(-x) / Gamma(alpha)
	// D.Knuth "The Art of Computer Programming", 2nd Edition, Vol. 2, Sec. 3.4.1
	{
		double x;
		if (alpha > 10000.0)
		{
			x = 1.0 - 1.0 / (9.0 * alpha) + r.nextGaussian() / Math.sqrt (9.0 * alpha);
			return alpha * x * x * x;
		}
		else if (alpha > 5.0)
		{
			x = 0.0;
			double the_root = Math.sqrt (2.0 * alpha - 1.0);
			boolean is_accepted = false;
			while (! is_accepted)
			{
				double y = Math.tan (Math.PI * r.nextDouble());
				x = the_root * y + alpha - 1.0;
				if (x <= 0)
					continue;
				double z = Math.exp ((alpha - 1.0) * (1.0 + Math.log (x / (alpha - 1.0))) - x);
				is_accepted = (r.nextDouble() <= z * (1.0 + y * y));
			}
			return x;
		}
		else if (alpha > 0.0)
		{
			x = 1.0;
			double frac_alpha = alpha;
			while (frac_alpha >= 1.0)
			{
				x *= r.nextDouble ();
				frac_alpha -= 1.0;
			}
			double output = - Math.log (x);
			if (frac_alpha > 0.0)  // Has to be between 0 and 1
			{
				double ceee = Math.E / (frac_alpha + Math.E);
				boolean is_accepted = false;
				while (! is_accepted)
				{
					double u = r.nextDouble();
					if (u <= ceee)
					{
						x = Math.pow (u / ceee, 1.0 / frac_alpha);
						is_accepted = (r.nextDouble() <= Math.exp (- x));
					}
					else
					{
						x = 1.0 - Math.log ((1.0 - u) / (1.0 - ceee));
						is_accepted = (r.nextDouble() <= Math.pow (x, frac_alpha - 1.0));
					}
				}
				output += x;
			}
			return output;
		}
		else  //  alpha <= 0.0
			return 0.0;
	}

	public static double[] scaleWeights (double[] w_unscaled, double[][] X, double icept, double meanLF, double sigmaLF)
	{
		int rows = X.length;
		int cols = w_unscaled.length;
		double[] w = new double [cols];
		for (int j = 0; j < cols; j ++)
			w [j] = w_unscaled [j];

		double sum_wx = 0.0;
		double sum_1x = 0.0;
		double sum_wxwx = 0.0;
		double sum_1x1x = 0.0;
		double sum_wx1x = 0.0;

		for (int i = 0; i < rows; i ++)
		{
			double wx = 0.0;
			double one_x = 0.0;
			for (int j = 0; j < cols; j ++)
			{
				wx += w [j] * X [i][j];
				one_x += X [i][j];
			}
			sum_wx += wx;
			sum_1x += one_x;
			sum_wxwx += wx * wx;
			sum_1x1x += one_x * one_x;
			sum_wx1x += wx * one_x;
		}

		double a0 = (meanLF - icept) * rows * sum_wx / (sum_wx * sum_wx + sum_1x * sum_1x);
		double b0 = (meanLF - icept) * rows * sum_1x / (sum_wx * sum_wx + sum_1x * sum_1x);
		double a1 = sum_1x;
		double b1 = - sum_wx;
		double qA = a1 * a1 * sum_wxwx + 2 * a1 * b1 * sum_wx1x + b1 * b1 * sum_1x1x;
		double qB = 2 * (a0 * a1 * sum_wxwx + a0 * b1 * sum_wx1x + a1 * b0 * sum_wx1x + b0 * b1 * sum_1x1x);
		double qC_nosigmaLF = a0 * a0 * sum_wxwx + 2 * a0 * b0 * sum_wx1x + b0 * b0 * sum_1x1x - rows * (meanLF - icept) * (meanLF - icept);
		double qC = qC_nosigmaLF - rows * sigmaLF * sigmaLF;
		double qD = qB * qB - 4 * qA * qC;
		if (qD < 0)
		{
			double new_sigmaLF = Math.sqrt (qC_nosigmaLF / rows - qB * qB / (4 * qA * rows));
			String error_message = String.format ("Cannot generate the weights: linear form variance demand is too tight!  Try sigmaLF >%8.4f", new_sigmaLF);
			System.out.println (error_message);
			System.out.flush ();
			throw new IllegalArgumentException (error_message);
		}
		double t = (- qB + Math.sqrt (qD)) / (2 * qA);
		double a = a0 + t * a1;
		double b = b0 + t * b1;
		for (int j = 0; j < cols; j ++)
			w [j] = a * w [j] + b;

		double sum_eta = 0.0;
		double sum_sq_eta = 0.0;
		for (int i = 0; i < rows; i ++)
		{
			double eta = 0.0;
			for (int j = 0; j < cols; j ++)
				eta +=  w [j] * X [i][j];
			sum_eta += eta;
			sum_sq_eta += eta * eta;
		}
		double mean_eta = icept + sum_eta / rows;
		double sigma_eta = Math.sqrt ((sum_sq_eta - sum_eta * sum_eta / rows) / (rows - 1));
		System.out.println (String.format ("Linear Form Mean  =%8.4f (Desired:%8.4f)",  mean_eta, meanLF));
		System.out.println (String.format ("Linear Form Sigma =%8.4f (Desired:%8.4f)", sigma_eta, sigmaLF));

		return w;
	}
	public static class GLMDist
	{
		final int dist;         // GLM distribution family type
		final double param;     // GLM parameter, typically variance power of the mean
		final int link;         // GLM link function type
		final double link_pow;  // GLM link function as power of the mean
		double dispersion = 1.0;
		long binom_n = 1;

		public GLMDist (int _dist, double _param, int _link, double _link_pow) {
			dist = _dist; param = _param; link = _link; link_pow = _link_pow;
		}

		public void set_dispersion (double _dispersion) {
			dispersion = _dispersion;
		}

		public void set_binom_n (long _n) {
			binom_n = _n;
		}

		public boolean is_binom_n_needed () {
			return (dist == 2 && param == 1.0);
		}

		public double nextGLM (Random r, double eta) {
			double mu = 0.0;
			switch (link) {
				case 1: // LINK: POWER
					if (link_pow == 0.0)	   // LINK: log
						mu = Math.exp (eta);
					else if (link_pow ==  1.0) // LINK: identity
						mu = eta;
					else if (link_pow == -1.0) // LINK: inverse
						mu = 1.0 / eta;
					else if (link_pow ==  0.5) // LINK: sqrt
						mu = eta * eta;
					else if (link_pow == -2.0) // LINK: 1/mu^2
						mu = Math.sqrt (1.0 / eta);
					else
						mu = Math.pow (eta, 1.0 / link_pow);
					break;
				case 2: // LINK: logit
					mu = 1.0 / (1.0 + Math.exp (- eta));
					break;
				case 3: // LINK: probit
					mu = TestUtils.gaussian_probability (eta);
					break;
				case 4: // LINK: cloglog
					mu = 1.0 - Math.exp (- Math.exp (eta));
					break;
				case 5: // LINK: cauchit
					mu = 0.5 + Math.atan (eta) / Math.PI;
					break;
				default:
					mu = 0.0;
			}

			double output = 0.0;
			if (dist == 1)  // POWER
			{
				double var_pow = param;
				if (var_pow == 0.0) // Gaussian, with dispersion = sigma^2
				{
					output = mu + Math.sqrt (dispersion) * r.nextGaussian ();
				}
				else if (var_pow == 1.0) // Poisson; Negative Binomial if overdispersion
				{
					double lambda = mu;
					if (dispersion > 1.000000001)
					{
						// output = Negative Binomial random variable with:
						//	 Number of failures = mu / (dispersion - 1.0)
						//	 Probability of success = 1.0 - 1.0 / dispersion
						lambda = (dispersion - 1.0) * TestUtils.nextGamma (r, mu / (dispersion - 1.0));
					}
					output = TestUtils.nextPoisson (r, lambda);
				}
				else if (var_pow == 2.0) // Gamma
				{
					double beta = dispersion * mu;
					output = beta * TestUtils.nextGamma (r, mu / beta);
				}
				else if (var_pow == 3.0) // Inverse Gaussian
				{
					// From: Raj Chhikara, J.L. Folks.  The Inverse Gaussian Distribution:
					// Theory: Methodology, and Applications.  CRC Press, 1988, Section 4.5
					double y_Gauss = r.nextGaussian ();
					double mu_y_sq = mu * y_Gauss * y_Gauss;
					double x_invG = 0.5 * dispersion * mu * (2.0 / dispersion + mu_y_sq
							- Math.sqrt (mu_y_sq * (4.0 / dispersion + mu_y_sq)));
					output = ((mu + x_invG) * r.nextDouble() < mu ? x_invG : (mu * mu / x_invG));
				}
				else
				{
					output = mu + Math.sqrt (12.0 * dispersion) * (r.nextDouble () - 0.5);
				}
			}
			else if (dist == 2 && param != 1.0) // Binomial, dispersion ignored
			{
				double bernoulli_zero = param;
				output = (r.nextDouble () < mu ? 1.0 : bernoulli_zero);
			}
			else if (dist == 2) // param == 1.0, Binomial Two-Column, dispersion used
			{
				double alpha_plus_beta = (binom_n - dispersion) / (dispersion - 1.0);
				double alpha = mu * alpha_plus_beta;
				double x = TestUtils.nextGamma (r, alpha);
				double y = TestUtils.nextGamma (r, alpha_plus_beta - alpha);
				double p = x / (x + y);
				long out = 0;
				for (long i = 0; i < binom_n; i++)
					if (r.nextDouble() < p)
						out ++;
				output = out;
			}
			return output;
		}
	}

	public static double[][] generateUnbalancedGLMInputDataX(int rows, int cols, double logFeatureVarianceDisbalance) {
		double[][] X = generateTestMatrix(rows, cols, -1.0, 1.0, 1.0, 34567);
		double shift_X = 1.0;
		// make the feature columns of X variance disbalanced
		for (int j = 0; j < cols; j++) {
			double varFactor = Math.pow(10.0, logFeatureVarianceDisbalance * (-0.25 + j / (double) (2 * cols - 2)));
			for (int i = 0; i < rows; i++)
				X[i][j] = shift_X + X[i][j] * varFactor;
		}
		return X;
	}

	public static double[] generateUnbalancedGLMInputDataB(double[][] X, int cols, double intercept, double avgLinearForm, double stdevLinearForm, Random r) {
		double[] beta_unscaled = new double[cols];
		for (int j = 0; j < cols; j++)
			beta_unscaled[j] = r.nextGaussian();
		return scaleWeights(beta_unscaled, X, intercept, avgLinearForm, stdevLinearForm);
	}

	public static double[][] generateUnbalancedGLMInputDataY(double[][] X, double[] beta, int rows, int cols, GLMDist glmdist, double intercept, double dispersion, Random r) {
		double[][] y = null;
		if (glmdist.is_binom_n_needed())
			y = new double[rows][2];
		else
			y = new double[rows][1];

		for (int i = 0; i < rows; i++) {
			double eta = intercept;
			for (int j = 0; j < cols; j++) {
				eta += X[i][j] * beta[j];
			}
			if (glmdist.is_binom_n_needed()) {
				long n = Math.round(dispersion * (1.0 + 2.0 * r.nextDouble()) + 1.0);
				glmdist.set_binom_n(n);
				y[i][0] = glmdist.nextGLM(r, eta);
				y[i][1] = n - y[i][0];
			}
			else {
				y[i][0] = glmdist.nextGLM(r, eta);
			}
		}

		return y;
	}

	public static boolean containsNan(double[][] data, int col) {
		for (double[] datum : data)
			if (Double.isNaN(datum[col]))
				return true;
		return false;
	}
	
	public static int isGPUAvailable() {
		// returns cudaSuccess if at least one gpu is available
		//final int[] deviceCount = new int[1];
		//return JCuda.cudaGetDeviceCount(deviceCount);
		// FIXME: Fails to skip if gpu available but no libraries
		return 1; //return false for now
	}

	public static MatrixBlock mockNonContiguousMatrix(MatrixBlock db){
		db.sparseToDense();
		double[] vals = db.getDenseBlockValues();
		int[] dims = new int[]{db.getNumRows(), db.getNumColumns()};
		MatrixBlock m = new MatrixBlock(db.getNumRows(), db.getNumColumns(), new DenseBlockFP64Mock(dims, vals));
		m.setNonZeros(db.getNonZeros());
		return m;
	}

	private static class DenseBlockFP64Mock extends DenseBlockFP64 {
		private static final long serialVersionUID = -3601232958390554672L;

		public DenseBlockFP64Mock(int[] dims, double[] data) {
			super(dims, data);
		}

		@Override
		public boolean isContiguous() {
			return false;
		}

		@Override
		public int numBlocks() {
			return 2;
		}
	}
}
